From bcfc9fca0c57df656b6619951e6decadc04f76bb Mon Sep 17 00:00:00 2001 From: Sushant Mahajan Date: Tue, 14 Jan 2025 15:29:10 +0530 Subject: [PATCH 1/9] KAFKA-18513: Validate share state topic record count in tests. --- .../kafka/test/api/ShareConsumerTest.java | 42 +++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/core/src/test/java/kafka/test/api/ShareConsumerTest.java b/core/src/test/java/kafka/test/api/ShareConsumerTest.java index 2107cdd7718dc..22af8610355ba 100644 --- a/core/src/test/java/kafka/test/api/ShareConsumerTest.java +++ b/core/src/test/java/kafka/test/api/ShareConsumerTest.java @@ -29,6 +29,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.KafkaShareConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; @@ -44,6 +45,7 @@ import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.internals.Topic; import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.serialization.ByteArrayDeserializer; @@ -77,6 +79,7 @@ import java.util.Optional; import java.util.Properties; import java.util.Set; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -95,6 +98,7 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; @Timeout(1200) @Tag("integration") @@ -251,6 +255,7 @@ public void testSubscriptionAndPoll(String persister) { shareConsumer.subscribe(Collections.singleton(tp.topic())); ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); assertEquals(1, records.count()); + maybeVerifyShareGroupStateTopicRecordCount(persister, 1); } } @@ -273,6 +278,7 @@ public void testSubscriptionAndPollMultiple(String persister) { producer.send(record); records = shareConsumer.poll(Duration.ofMillis(5000)); assertEquals(1, records.count()); + maybeVerifyShareGroupStateTopicRecordCount(persister, 3); } } @@ -339,6 +345,7 @@ public void testAcknowledgementCommitCallbackSuccessfulAcknowledgement(String pe // We expect null exception as the acknowledgment error code is null. assertNull(partitionExceptionMap.get(tp)); + maybeVerifyShareGroupStateTopicRecordCount(persister, 2); } } @@ -369,6 +376,7 @@ public void testAcknowledgementCommitCallbackOnClose(String persister) { // We expect null exception as the acknowledgment error code is null. assertTrue(partitionExceptionMap.containsKey(tp)); assertNull(partitionExceptionMap.get(tp)); + maybeVerifyShareGroupStateTopicRecordCount(persister, 2); } } @@ -1922,6 +1930,40 @@ private void warmup() throws InterruptedException { } } + private void maybeVerifyShareGroupStateTopicRecordCount(String persister, int messageCount) { + if (!persister.equals(DEFAULT_STATE_PERSISTER)) { + return; + } + try { + TestUtils.waitForCondition(() -> + !cluster.brokers().get(0).metadataCache().getAliveBrokerNodes(new ListenerName("EXTERNAL")).isEmpty(), + DEFAULT_MAX_WAIT_MS, 100L, () -> "cache not up yet"); + Map consumerConfigs = new HashMap<>(); + consumerConfigs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()); + consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); + consumerConfigs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + consumerConfigs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + try (KafkaConsumer consumer = new KafkaConsumer<>(consumerConfigs)) { + consumer.subscribe(Collections.singleton(Topic.SHARE_GROUP_STATE_TOPIC_NAME)); + Set> records = new HashSet<>(); + TestUtils.waitForCondition(() -> { + ConsumerRecords msgs = consumer.poll(Duration.ofMillis(5000L)); + if (msgs.count() > 0) { + msgs.records(Topic.SHARE_GROUP_STATE_TOPIC_NAME).forEach(records::add); + } + return records.size() == messageCount + 2; // +2 because of extra warmup records + }, + DEFAULT_MAX_WAIT_MS, + 1000L, + () -> "no records in " + Topic.SHARE_GROUP_STATE_TOPIC_NAME + ); + } + } catch (InterruptedException e) { + fail(e); + } + } + private void alterShareAutoOffsetReset(String groupId, String newValue) { ConfigResource configResource = new ConfigResource(ConfigResource.Type.GROUP, groupId); Map> alterEntries = new HashMap<>(); From d1d88c5a142464ec74f862e2421049ae579479c2 Mon Sep 17 00:00:00 2001 From: Sushant Mahajan Date: Tue, 14 Jan 2025 17:09:06 +0530 Subject: [PATCH 2/9] incorporated comment, updated all tests. --- .../kafka/test/api/ShareConsumerTest.java | 57 ++++++++++++++++--- 1 file changed, 49 insertions(+), 8 deletions(-) diff --git a/core/src/test/java/kafka/test/api/ShareConsumerTest.java b/core/src/test/java/kafka/test/api/ShareConsumerTest.java index 22af8610355ba..c1e6d983c4884 100644 --- a/core/src/test/java/kafka/test/api/ShareConsumerTest.java +++ b/core/src/test/java/kafka/test/api/ShareConsumerTest.java @@ -94,6 +94,7 @@ import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -173,6 +174,7 @@ public void testSubscribeAndPollNoRecords(String persister) { assertEquals(subscription, shareConsumer.subscription()); ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(500)); assertEquals(0, records.count()); + maybeVerifyShareGroupStateTopicRecordCount(persister, 1); } } @@ -188,6 +190,7 @@ public void testSubscribePollUnsubscribe(String persister) { shareConsumer.unsubscribe(); assertEquals(Collections.emptySet(), shareConsumer.subscription()); assertEquals(0, records.count()); + maybeVerifyShareGroupStateTopicRecordCount(persister, 1); } } @@ -205,6 +208,7 @@ public void testSubscribePollSubscribe(String persister) { assertEquals(subscription, shareConsumer.subscription()); records = shareConsumer.poll(Duration.ofMillis(500)); assertEquals(0, records.count()); + maybeVerifyShareGroupStateTopicRecordCount(persister, 1); } } @@ -222,6 +226,7 @@ public void testSubscribeUnsubscribePollFails(String persister) { // "Consumer is not subscribed to any topics." assertThrows(IllegalStateException.class, () -> shareConsumer.poll(Duration.ofMillis(500))); assertEquals(0, records.count()); + maybeVerifyShareGroupStateTopicRecordCount(persister, 1); // due to leader epoch in read } } @@ -239,6 +244,7 @@ public void testSubscribeSubscribeEmptyPollFails(String persister) { // "Consumer is not subscribed to any topics." assertThrows(IllegalStateException.class, () -> shareConsumer.poll(Duration.ofMillis(500))); assertEquals(0, records.count()); + maybeVerifyShareGroupStateTopicRecordCount(persister, 1); // due to leader epoch in read } } @@ -255,7 +261,7 @@ public void testSubscriptionAndPoll(String persister) { shareConsumer.subscribe(Collections.singleton(tp.topic())); ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); assertEquals(1, records.count()); - maybeVerifyShareGroupStateTopicRecordCount(persister, 1); + maybeVerifyShareGroupStateTopicRecordCount(persister, 2); } } @@ -315,6 +321,7 @@ public void testAcknowledgementSentOnSubscriptionChange(String persister) throws // Verifying if the callback was invoked without exceptions for the partitions for both topics. assertNull(partitionExceptionMap.get(tp)); assertNull(partitionExceptionMap.get(tp2)); + maybeVerifyShareGroupStateTopicRecordCount(persister, 4); } } @@ -459,6 +466,7 @@ public void testHeaders(String persister) { if (header != null) assertEquals("headerValue", new String(header.value())); } + maybeVerifyShareGroupStateTopicRecordCount(persister, 2); } } @@ -483,6 +491,7 @@ private void testHeadersSerializeDeserialize(Serializer serializer, Dese @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) public void testHeadersSerializerDeserializer(String persister) { testHeadersSerializeDeserialize(new BaseConsumerTest.SerializerImpl(), new BaseConsumerTest.DeserializerImpl()); + maybeVerifyShareGroupStateTopicRecordCount(persister, 2); } @ParameterizedTest(name = "{displayName}.persister={0}") @@ -515,6 +524,7 @@ public void testMaxPollRecords(String persister) { i++; } + maybeVerifyShareGroupStateTopicRecordCount(persister, 4); } } @@ -560,6 +570,7 @@ public void testControlRecordsSkipped(String persister) throws Exception { records = shareConsumer.poll(Duration.ofMillis(500)); assertEquals(0, records.count()); + maybeVerifyShareGroupStateTopicRecordCount(persister, 2); } } @@ -581,6 +592,7 @@ public void testExplicitAcknowledgeSuccess(String persister) { producer.send(record); records = shareConsumer.poll(Duration.ofMillis(5000)); assertEquals(1, records.count()); + maybeVerifyShareGroupStateTopicRecordCount(persister, 3); } } @@ -604,6 +616,7 @@ public void testExplicitAcknowledgeCommitSuccess(String persister) { assertEquals(1, result.size()); records = shareConsumer.poll(Duration.ofMillis(5000)); assertEquals(1, records.count()); + maybeVerifyShareGroupStateTopicRecordCount(persister, 3); } } @@ -660,6 +673,7 @@ public void testExplicitAcknowledgementCommitAsync(String persister) throws Inte }, 30000, 100L, () -> "Didn't receive call to callback"); assertNull(partitionExceptionMap1.get(tp)); + maybeVerifyShareGroupStateTopicRecordCount(persister, 4); } } @@ -723,6 +737,7 @@ public void testExplicitAcknowledgementCommitAsyncPartialBatch(String persister) assertTrue(partitionExceptionMap.containsKey(tp)); assertNull(partitionExceptionMap.get(tp)); + maybeVerifyShareGroupStateTopicRecordCount(persister, 3); } } @@ -746,6 +761,7 @@ public void testExplicitAcknowledgeReleasePollAccept(String persister) { records.forEach(consumedRecord -> shareConsumer.acknowledge(consumedRecord, AcknowledgeType.ACCEPT)); records = shareConsumer.poll(Duration.ofMillis(500)); assertEquals(0, records.count()); + maybeVerifyShareGroupStateTopicRecordCount(persister, 3); } } @@ -766,6 +782,7 @@ public void testExplicitAcknowledgeReleaseAccept(String persister) { records.forEach(consumedRecord -> shareConsumer.acknowledge(consumedRecord, AcknowledgeType.ACCEPT)); records = shareConsumer.poll(Duration.ofMillis(500)); assertEquals(0, records.count()); + maybeVerifyShareGroupStateTopicRecordCount(persister, 2); } } @@ -784,6 +801,7 @@ public void testExplicitAcknowledgeReleaseClose(String persister) { ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); assertEquals(1, records.count()); records.forEach(consumedRecord -> shareConsumer.acknowledge(consumedRecord, AcknowledgeType.RELEASE)); + maybeVerifyShareGroupStateTopicRecordCount(persister, 2); } } @@ -806,6 +824,7 @@ public void testExplicitAcknowledgeThrowsNotInBatch(String persister) { records = shareConsumer.poll(Duration.ofMillis(500)); assertEquals(0, records.count()); assertThrows(IllegalStateException.class, () -> shareConsumer.acknowledge(consumedRecord)); + maybeVerifyShareGroupStateTopicRecordCount(persister, 2); } } @@ -827,6 +846,7 @@ public void testImplicitAcknowledgeFailsExplicit(String persister) { records = shareConsumer.poll(Duration.ofMillis(500)); assertEquals(0, records.count()); assertThrows(IllegalStateException.class, () -> shareConsumer.acknowledge(consumedRecord)); + maybeVerifyShareGroupStateTopicRecordCount(persister, 2); } } @@ -850,6 +870,7 @@ public void testImplicitAcknowledgeCommitSync(String persister) { assertEquals(0, result.size()); records = shareConsumer.poll(Duration.ofMillis(500)); assertEquals(0, records.count()); + maybeVerifyShareGroupStateTopicRecordCount(persister, 2); } } @@ -889,6 +910,7 @@ public void testImplicitAcknowledgementCommitAsync(String persister) throws Inte }, DEFAULT_MAX_WAIT_MS, 100L, () -> "Acknowledgement commit callback did not receive the response yet"); assertNull(partitionExceptionMap1.get(tp)); + maybeVerifyShareGroupStateTopicRecordCount(persister, 2); } } @@ -911,6 +933,7 @@ public void testFetchRecordLargerThanMaxPartitionFetchBytes(String persister) th ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); assertEquals(1, records.count()); + maybeVerifyShareGroupStateTopicRecordCount(persister, 2); } } @@ -962,6 +985,7 @@ public void testMultipleConsumersWithDifferentGroupIds(String persister) throws int records2 = shareConsumer2Records.addAndGet(shareConsumer2.poll(Duration.ofMillis(2000)).count()); return records1 == 3 && records2 == 5; }, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume records for both consumers for the last batch"); + maybeVerifyShareGroupStateTopicRecordCount(persister, 7); } } @@ -999,6 +1023,7 @@ public void testMultipleConsumersInGroupSequentialConsumption(String persister) } assertEquals(totalMessages, consumer1MessageCount + consumer2MessageCount); + maybeVerifyShareGroupStateTopicRecordCount(persister, 4); } } @@ -1099,6 +1124,7 @@ public void testMultipleConsumersInMultipleGroupsConcurrentConsumption(String pe assertEquals(totalMessagesSent, totalResult2); assertEquals(totalMessagesSent, totalResult3); assertEquals(totalMessagesSent, actualMessageSent); + maybeVerifyShareGroupStateTopicRecordCount(persister, 21); } @ParameterizedTest(name = "{displayName}.persister={0}") @@ -1145,6 +1171,7 @@ public void testConsumerCloseInGroupSequential(String persister) { } shareConsumer2.close(); assertEquals(totalMessages, consumer1MessageCount + consumer2MessageCount); + maybeVerifyShareGroupStateTopicRecordCount(persister, 3); } } @@ -1191,6 +1218,7 @@ public void testMultipleConsumersInGroupFailureConcurrentConsumption(String pers int totalSuccessResult = consumeMessagesFutures.stream().mapToInt(CompletableFuture::join).sum(); assertEquals(producerCount * messagesPerProducer, totalSuccessResult); + maybeVerifyShareGroupStateTopicRecordCount(persister, 5); } @ParameterizedTest(name = "{displayName}.persister={0}") @@ -1250,6 +1278,7 @@ public void testAcquisitionLockTimeoutOnConsumer(String persister) throws Interr consumerRecords = shareConsumer.poll(Duration.ofMillis(1000)); assertEquals(0, consumerRecords.count()); + maybeVerifyShareGroupStateTopicRecordCount(persister, 4); } } @@ -1277,6 +1306,7 @@ public void testAcknowledgementCommitCallbackCallsShareConsumerDisallowed(String // The acknowledgement commit callback will be called and the exception is thrown. // This is verified inside the onComplete() method implementation. shareConsumer.poll(Duration.ofMillis(500)); + maybeVerifyShareGroupStateTopicRecordCount(persister, 2); } } @@ -1332,6 +1362,7 @@ public void testAcknowledgementCommitCallbackCallsShareConsumerWakeup(String per } return exceptionThrown.get(); }, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to receive expected exception"); + maybeVerifyShareGroupStateTopicRecordCount(persister, 2); } } @@ -1378,6 +1409,7 @@ public void testAcknowledgementCommitCallbackThrowsException(String persister) t } return exceptionThrown.get(); }, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to receive expected exception"); + maybeVerifyShareGroupStateTopicRecordCount(persister, 2); } } @@ -1453,6 +1485,7 @@ public void testWakeupWithFetchedRecordsAvailable(String persister) { ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); assertEquals(1, records.count()); + maybeVerifyShareGroupStateTopicRecordCount(persister, 2); } } @@ -1482,6 +1515,7 @@ public void testSubscriptionFollowedByTopicCreation(String persister) throws Int producer.send(record); records = shareConsumer.poll(Duration.ofMillis(5000)); assertEquals(1, records.count()); + maybeVerifyShareGroupStateTopicRecordCount(persister, 4); } } @@ -1524,6 +1558,7 @@ public void testSubscriptionAndPollFollowedByTopicDeletion(String persister) thr producer.send(recordTopic2).get(); TestUtils.waitForCondition(() -> shareConsumer.poll(Duration.ofMillis(2000)).count() == 1, DEFAULT_MAX_WAIT_MS, 100L, () -> "incorrect number of records"); + maybeVerifyShareGroupStateTopicRecordCount(persister, 7); } } @@ -1566,6 +1601,7 @@ public void testLsoMovementByRecordsDeletion(String persister) { messageCount = consumeMessages(new AtomicInteger(0), 0, groupId, 1, 5, true); assertEquals(0, messageCount); + maybeVerifyShareGroupStateTopicRecordCount(persister, 3); } } @@ -1592,6 +1628,7 @@ public void testShareAutoOffsetResetDefaultValue(String persister) { records = shareConsumer.poll(Duration.ofMillis(5000)); // Now the next record should be consumed successfully assertEquals(1, records.count()); + maybeVerifyShareGroupStateTopicRecordCount(persister, 2); } } @@ -1617,6 +1654,7 @@ public void testShareAutoOffsetResetEarliest(String persister) { records = shareConsumer.poll(Duration.ofMillis(5000)); // The next records should also be consumed successfully assertEquals(1, records.count()); + maybeVerifyShareGroupStateTopicRecordCount(persister, 3); } } @@ -1641,6 +1679,7 @@ public void testShareAutoOffsetResetEarliestAfterLsoMovement(String persister) { int consumedMessageCount = consumeMessages(new AtomicInteger(0), 5, "group1", 1, 10, true); // The records returned belong to offsets 5-9. assertEquals(5, consumedMessageCount); + maybeVerifyShareGroupStateTopicRecordCount(persister, 2); } } @@ -1681,6 +1720,7 @@ public void testShareAutoOffsetResetMultipleGroupsWithDifferentValue(String pers records2 = shareConsumerLatest.poll(Duration.ofMillis(5000)); // The next record should also be consumed successfully by group2 assertEquals(1, records2.count()); + maybeVerifyShareGroupStateTopicRecordCount(persister, 5); } } @@ -1732,6 +1772,7 @@ public void testShareAutoOffsetResetByDuration(String persister) throws Exceptio shareConsumer.subscribe(Collections.singleton(tp.topic())); List> records = consumeRecords(shareConsumer, 3); assertEquals(3, records.size()); + maybeVerifyShareGroupStateTopicRecordCount(persister, 4); } } @@ -1747,14 +1788,14 @@ public void testShareAutoOffsetResetByDurationInvalidFormat(String persister) th GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "by_duration:1h"), AlterConfigOp.OpType.SET))); ExecutionException e1 = assertThrows(ExecutionException.class, () -> adminClient.incrementalAlterConfigs(alterEntries).all().get()); - assertTrue(e1.getCause() instanceof InvalidConfigurationException); + assertInstanceOf(InvalidConfigurationException.class, e1.getCause()); // Test negative duration alterEntries.put(configResource, List.of(new AlterConfigOp(new ConfigEntry( GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "by_duration:-PT1H"), AlterConfigOp.OpType.SET))); ExecutionException e2 = assertThrows(ExecutionException.class, () -> adminClient.incrementalAlterConfigs(alterEntries).all().get()); - assertTrue(e2.getCause() instanceof InvalidConfigurationException); + assertInstanceOf(InvalidConfigurationException.class, e2.getCause()); } private int produceMessages(int messageCount) { @@ -1945,18 +1986,18 @@ private void maybeVerifyShareGroupStateTopicRecordCount(String persister, int me consumerConfigs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); try (KafkaConsumer consumer = new KafkaConsumer<>(consumerConfigs)) { - consumer.subscribe(Collections.singleton(Topic.SHARE_GROUP_STATE_TOPIC_NAME)); + consumer.subscribe(List.of(Topic.SHARE_GROUP_STATE_TOPIC_NAME)); Set> records = new HashSet<>(); TestUtils.waitForCondition(() -> { ConsumerRecords msgs = consumer.poll(Duration.ofMillis(5000L)); if (msgs.count() > 0) { msgs.records(Topic.SHARE_GROUP_STATE_TOPIC_NAME).forEach(records::add); } - return records.size() == messageCount + 2; // +2 because of extra warmup records + return records.size() == (messageCount + 2); // +2 because of extra warmup records }, - DEFAULT_MAX_WAIT_MS, - 1000L, - () -> "no records in " + Topic.SHARE_GROUP_STATE_TOPIC_NAME + 30000L, + 200L, + () -> String.format("records found %d but expected %d", Math.max(0, records.size() - 2), messageCount) ); } } catch (InterruptedException e) { From 76d867263e04d529f323cf631f6ca3263fb986ae Mon Sep 17 00:00:00 2001 From: Sushant Mahajan Date: Tue, 14 Jan 2025 18:36:59 +0530 Subject: [PATCH 3/9] use assign and seektobeginning over subscribe and offset config --- core/src/test/java/kafka/test/api/ShareConsumerTest.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/core/src/test/java/kafka/test/api/ShareConsumerTest.java b/core/src/test/java/kafka/test/api/ShareConsumerTest.java index fc113e2e9f4fb..5afaa848853ca 100644 --- a/core/src/test/java/kafka/test/api/ShareConsumerTest.java +++ b/core/src/test/java/kafka/test/api/ShareConsumerTest.java @@ -110,6 +110,7 @@ public class ShareConsumerTest { private final TopicPartition warmupTp = new TopicPartition("warmup", 0); private static final String DEFAULT_STATE_PERSISTER = "org.apache.kafka.server.share.persister.DefaultStatePersister"; private static final String NO_OP_PERSISTER = "org.apache.kafka.server.share.persister.NoOpShareStatePersister"; + private List sgsTopicPartitions; private Admin adminClient; @@ -145,6 +146,9 @@ public void createCluster(TestInfo testInfo) throws Exception { createTopic("topic"); createTopic("topic2"); adminClient = createAdminClient(); + sgsTopicPartitions = IntStream.range(0, 3) + .mapToObj(part -> new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, part)) + .toList(); warmup(); } @@ -1984,9 +1988,10 @@ private void maybeVerifyShareGroupStateTopicRecordCount(String persister, int me consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); consumerConfigs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); consumerConfigs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); - consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + try (KafkaConsumer consumer = new KafkaConsumer<>(consumerConfigs)) { - consumer.subscribe(List.of(Topic.SHARE_GROUP_STATE_TOPIC_NAME)); + consumer.assign(sgsTopicPartitions); + consumer.seekToBeginning(sgsTopicPartitions); Set> records = new HashSet<>(); TestUtils.waitForCondition(() -> { ConsumerRecords msgs = consumer.poll(Duration.ofMillis(5000L)); From 729793bd892d728bd71fef258fe4e226cd5f61cf Mon Sep 17 00:00:00 2001 From: Sushant Mahajan Date: Tue, 14 Jan 2025 22:17:32 +0530 Subject: [PATCH 4/9] fix verify condition for multi cons, multi group concurrent consumption. --- core/src/test/java/kafka/test/api/ShareConsumerTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/java/kafka/test/api/ShareConsumerTest.java b/core/src/test/java/kafka/test/api/ShareConsumerTest.java index 5afaa848853ca..c338205d1b09a 100644 --- a/core/src/test/java/kafka/test/api/ShareConsumerTest.java +++ b/core/src/test/java/kafka/test/api/ShareConsumerTest.java @@ -1128,7 +1128,7 @@ public void testMultipleConsumersInMultipleGroupsConcurrentConsumption(String pe assertEquals(totalMessagesSent, totalResult2); assertEquals(totalMessagesSent, totalResult3); assertEquals(totalMessagesSent, actualMessageSent); - maybeVerifyShareGroupStateTopicRecordCount(persister, 21); + maybeVerifyShareGroupStateTopicRecordCount(persister, 12); } @ParameterizedTest(name = "{displayName}.persister={0}") From af187a3cb876e7b0da671c14bbfcd20e955dd4c3 Mon Sep 17 00:00:00 2001 From: Sushant Mahajan Date: Tue, 14 Jan 2025 22:31:27 +0530 Subject: [PATCH 5/9] remove SGS verification from multi cons sequential test. --- core/src/test/java/kafka/test/api/ShareConsumerTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/test/java/kafka/test/api/ShareConsumerTest.java b/core/src/test/java/kafka/test/api/ShareConsumerTest.java index c338205d1b09a..d59b7abd04c15 100644 --- a/core/src/test/java/kafka/test/api/ShareConsumerTest.java +++ b/core/src/test/java/kafka/test/api/ShareConsumerTest.java @@ -1027,7 +1027,6 @@ public void testMultipleConsumersInGroupSequentialConsumption(String persister) } assertEquals(totalMessages, consumer1MessageCount + consumer2MessageCount); - maybeVerifyShareGroupStateTopicRecordCount(persister, 4); } } From 6413ada19009d232b9a4dd7f8dc62ea69fc774c2 Mon Sep 17 00:00:00 2001 From: Sushant Mahajan Date: Wed, 15 Jan 2025 10:45:39 +0530 Subject: [PATCH 6/9] relax condition for exact record match. --- .../kafka/test/api/ShareConsumerTest.java | 92 +++++++++---------- 1 file changed, 46 insertions(+), 46 deletions(-) diff --git a/core/src/test/java/kafka/test/api/ShareConsumerTest.java b/core/src/test/java/kafka/test/api/ShareConsumerTest.java index d59b7abd04c15..7ed856d0c1432 100644 --- a/core/src/test/java/kafka/test/api/ShareConsumerTest.java +++ b/core/src/test/java/kafka/test/api/ShareConsumerTest.java @@ -178,7 +178,7 @@ public void testSubscribeAndPollNoRecords(String persister) { assertEquals(subscription, shareConsumer.subscription()); ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(500)); assertEquals(0, records.count()); - maybeVerifyShareGroupStateTopicRecordCount(persister, 1); + maybeVerifyShareGroupStateTopicRecordCount(persister); } } @@ -194,7 +194,7 @@ public void testSubscribePollUnsubscribe(String persister) { shareConsumer.unsubscribe(); assertEquals(Collections.emptySet(), shareConsumer.subscription()); assertEquals(0, records.count()); - maybeVerifyShareGroupStateTopicRecordCount(persister, 1); + maybeVerifyShareGroupStateTopicRecordCount(persister); } } @@ -212,7 +212,7 @@ public void testSubscribePollSubscribe(String persister) { assertEquals(subscription, shareConsumer.subscription()); records = shareConsumer.poll(Duration.ofMillis(500)); assertEquals(0, records.count()); - maybeVerifyShareGroupStateTopicRecordCount(persister, 1); + maybeVerifyShareGroupStateTopicRecordCount(persister); } } @@ -230,7 +230,7 @@ public void testSubscribeUnsubscribePollFails(String persister) { // "Consumer is not subscribed to any topics." assertThrows(IllegalStateException.class, () -> shareConsumer.poll(Duration.ofMillis(500))); assertEquals(0, records.count()); - maybeVerifyShareGroupStateTopicRecordCount(persister, 1); // due to leader epoch in read + maybeVerifyShareGroupStateTopicRecordCount(persister); // due to leader epoch in read } } @@ -248,7 +248,7 @@ public void testSubscribeSubscribeEmptyPollFails(String persister) { // "Consumer is not subscribed to any topics." assertThrows(IllegalStateException.class, () -> shareConsumer.poll(Duration.ofMillis(500))); assertEquals(0, records.count()); - maybeVerifyShareGroupStateTopicRecordCount(persister, 1); // due to leader epoch in read + maybeVerifyShareGroupStateTopicRecordCount(persister); // due to leader epoch in read } } @@ -265,7 +265,7 @@ public void testSubscriptionAndPoll(String persister) { shareConsumer.subscribe(Collections.singleton(tp.topic())); ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); assertEquals(1, records.count()); - maybeVerifyShareGroupStateTopicRecordCount(persister, 2); + maybeVerifyShareGroupStateTopicRecordCount(persister); } } @@ -288,7 +288,7 @@ public void testSubscriptionAndPollMultiple(String persister) { producer.send(record); records = shareConsumer.poll(Duration.ofMillis(5000)); assertEquals(1, records.count()); - maybeVerifyShareGroupStateTopicRecordCount(persister, 3); + maybeVerifyShareGroupStateTopicRecordCount(persister); } } @@ -325,7 +325,7 @@ public void testAcknowledgementSentOnSubscriptionChange(String persister) throws // Verifying if the callback was invoked without exceptions for the partitions for both topics. assertNull(partitionExceptionMap.get(tp)); assertNull(partitionExceptionMap.get(tp2)); - maybeVerifyShareGroupStateTopicRecordCount(persister, 4); + maybeVerifyShareGroupStateTopicRecordCount(persister); } } @@ -356,7 +356,7 @@ public void testAcknowledgementCommitCallbackSuccessfulAcknowledgement(String pe // We expect null exception as the acknowledgment error code is null. assertNull(partitionExceptionMap.get(tp)); - maybeVerifyShareGroupStateTopicRecordCount(persister, 2); + maybeVerifyShareGroupStateTopicRecordCount(persister); } } @@ -387,7 +387,7 @@ public void testAcknowledgementCommitCallbackOnClose(String persister) { // We expect null exception as the acknowledgment error code is null. assertTrue(partitionExceptionMap.containsKey(tp)); assertNull(partitionExceptionMap.get(tp)); - maybeVerifyShareGroupStateTopicRecordCount(persister, 2); + maybeVerifyShareGroupStateTopicRecordCount(persister); } } @@ -470,7 +470,7 @@ public void testHeaders(String persister) { if (header != null) assertEquals("headerValue", new String(header.value())); } - maybeVerifyShareGroupStateTopicRecordCount(persister, 2); + maybeVerifyShareGroupStateTopicRecordCount(persister); } } @@ -495,7 +495,7 @@ private void testHeadersSerializeDeserialize(Serializer serializer, Dese @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) public void testHeadersSerializerDeserializer(String persister) { testHeadersSerializeDeserialize(new BaseConsumerTest.SerializerImpl(), new BaseConsumerTest.DeserializerImpl()); - maybeVerifyShareGroupStateTopicRecordCount(persister, 2); + maybeVerifyShareGroupStateTopicRecordCount(persister); } @ParameterizedTest(name = "{displayName}.persister={0}") @@ -528,7 +528,7 @@ public void testMaxPollRecords(String persister) { i++; } - maybeVerifyShareGroupStateTopicRecordCount(persister, 4); + maybeVerifyShareGroupStateTopicRecordCount(persister); } } @@ -574,7 +574,7 @@ public void testControlRecordsSkipped(String persister) throws Exception { records = shareConsumer.poll(Duration.ofMillis(500)); assertEquals(0, records.count()); - maybeVerifyShareGroupStateTopicRecordCount(persister, 2); + maybeVerifyShareGroupStateTopicRecordCount(persister); } } @@ -596,7 +596,7 @@ public void testExplicitAcknowledgeSuccess(String persister) { producer.send(record); records = shareConsumer.poll(Duration.ofMillis(5000)); assertEquals(1, records.count()); - maybeVerifyShareGroupStateTopicRecordCount(persister, 3); + maybeVerifyShareGroupStateTopicRecordCount(persister); } } @@ -620,7 +620,7 @@ public void testExplicitAcknowledgeCommitSuccess(String persister) { assertEquals(1, result.size()); records = shareConsumer.poll(Duration.ofMillis(5000)); assertEquals(1, records.count()); - maybeVerifyShareGroupStateTopicRecordCount(persister, 3); + maybeVerifyShareGroupStateTopicRecordCount(persister); } } @@ -677,7 +677,7 @@ public void testExplicitAcknowledgementCommitAsync(String persister) throws Inte }, 30000, 100L, () -> "Didn't receive call to callback"); assertNull(partitionExceptionMap1.get(tp)); - maybeVerifyShareGroupStateTopicRecordCount(persister, 4); + maybeVerifyShareGroupStateTopicRecordCount(persister); } } @@ -741,7 +741,7 @@ public void testExplicitAcknowledgementCommitAsyncPartialBatch(String persister) assertTrue(partitionExceptionMap.containsKey(tp)); assertNull(partitionExceptionMap.get(tp)); - maybeVerifyShareGroupStateTopicRecordCount(persister, 3); + maybeVerifyShareGroupStateTopicRecordCount(persister); } } @@ -765,7 +765,7 @@ public void testExplicitAcknowledgeReleasePollAccept(String persister) { records.forEach(consumedRecord -> shareConsumer.acknowledge(consumedRecord, AcknowledgeType.ACCEPT)); records = shareConsumer.poll(Duration.ofMillis(500)); assertEquals(0, records.count()); - maybeVerifyShareGroupStateTopicRecordCount(persister, 3); + maybeVerifyShareGroupStateTopicRecordCount(persister); } } @@ -786,7 +786,7 @@ public void testExplicitAcknowledgeReleaseAccept(String persister) { records.forEach(consumedRecord -> shareConsumer.acknowledge(consumedRecord, AcknowledgeType.ACCEPT)); records = shareConsumer.poll(Duration.ofMillis(500)); assertEquals(0, records.count()); - maybeVerifyShareGroupStateTopicRecordCount(persister, 2); + maybeVerifyShareGroupStateTopicRecordCount(persister); } } @@ -805,7 +805,7 @@ public void testExplicitAcknowledgeReleaseClose(String persister) { ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); assertEquals(1, records.count()); records.forEach(consumedRecord -> shareConsumer.acknowledge(consumedRecord, AcknowledgeType.RELEASE)); - maybeVerifyShareGroupStateTopicRecordCount(persister, 2); + maybeVerifyShareGroupStateTopicRecordCount(persister); } } @@ -828,7 +828,7 @@ public void testExplicitAcknowledgeThrowsNotInBatch(String persister) { records = shareConsumer.poll(Duration.ofMillis(500)); assertEquals(0, records.count()); assertThrows(IllegalStateException.class, () -> shareConsumer.acknowledge(consumedRecord)); - maybeVerifyShareGroupStateTopicRecordCount(persister, 2); + maybeVerifyShareGroupStateTopicRecordCount(persister); } } @@ -850,7 +850,7 @@ public void testImplicitAcknowledgeFailsExplicit(String persister) { records = shareConsumer.poll(Duration.ofMillis(500)); assertEquals(0, records.count()); assertThrows(IllegalStateException.class, () -> shareConsumer.acknowledge(consumedRecord)); - maybeVerifyShareGroupStateTopicRecordCount(persister, 2); + maybeVerifyShareGroupStateTopicRecordCount(persister); } } @@ -874,7 +874,7 @@ public void testImplicitAcknowledgeCommitSync(String persister) { assertEquals(0, result.size()); records = shareConsumer.poll(Duration.ofMillis(500)); assertEquals(0, records.count()); - maybeVerifyShareGroupStateTopicRecordCount(persister, 2); + maybeVerifyShareGroupStateTopicRecordCount(persister); } } @@ -914,7 +914,7 @@ public void testImplicitAcknowledgementCommitAsync(String persister) throws Inte }, DEFAULT_MAX_WAIT_MS, 100L, () -> "Acknowledgement commit callback did not receive the response yet"); assertNull(partitionExceptionMap1.get(tp)); - maybeVerifyShareGroupStateTopicRecordCount(persister, 2); + maybeVerifyShareGroupStateTopicRecordCount(persister); } } @@ -937,7 +937,7 @@ public void testFetchRecordLargerThanMaxPartitionFetchBytes(String persister) th ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); assertEquals(2, records.count()); - maybeVerifyShareGroupStateTopicRecordCount(persister, 2); + maybeVerifyShareGroupStateTopicRecordCount(persister); } } @@ -989,7 +989,7 @@ public void testMultipleConsumersWithDifferentGroupIds(String persister) throws int records2 = shareConsumer2Records.addAndGet(shareConsumer2.poll(Duration.ofMillis(2000)).count()); return records1 == 3 && records2 == 5; }, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume records for both consumers for the last batch"); - maybeVerifyShareGroupStateTopicRecordCount(persister, 7); + maybeVerifyShareGroupStateTopicRecordCount(persister); } } @@ -1127,7 +1127,7 @@ public void testMultipleConsumersInMultipleGroupsConcurrentConsumption(String pe assertEquals(totalMessagesSent, totalResult2); assertEquals(totalMessagesSent, totalResult3); assertEquals(totalMessagesSent, actualMessageSent); - maybeVerifyShareGroupStateTopicRecordCount(persister, 12); + maybeVerifyShareGroupStateTopicRecordCount(persister); } @ParameterizedTest(name = "{displayName}.persister={0}") @@ -1174,7 +1174,7 @@ public void testConsumerCloseInGroupSequential(String persister) { } shareConsumer2.close(); assertEquals(totalMessages, consumer1MessageCount + consumer2MessageCount); - maybeVerifyShareGroupStateTopicRecordCount(persister, 3); + maybeVerifyShareGroupStateTopicRecordCount(persister); } } @@ -1221,7 +1221,7 @@ public void testMultipleConsumersInGroupFailureConcurrentConsumption(String pers int totalSuccessResult = consumeMessagesFutures.stream().mapToInt(CompletableFuture::join).sum(); assertEquals(producerCount * messagesPerProducer, totalSuccessResult); - maybeVerifyShareGroupStateTopicRecordCount(persister, 5); + maybeVerifyShareGroupStateTopicRecordCount(persister); } @ParameterizedTest(name = "{displayName}.persister={0}") @@ -1281,7 +1281,7 @@ public void testAcquisitionLockTimeoutOnConsumer(String persister) throws Interr consumerRecords = shareConsumer.poll(Duration.ofMillis(1000)); assertEquals(0, consumerRecords.count()); - maybeVerifyShareGroupStateTopicRecordCount(persister, 4); + maybeVerifyShareGroupStateTopicRecordCount(persister); } } @@ -1309,7 +1309,7 @@ public void testAcknowledgementCommitCallbackCallsShareConsumerDisallowed(String // The acknowledgement commit callback will be called and the exception is thrown. // This is verified inside the onComplete() method implementation. shareConsumer.poll(Duration.ofMillis(500)); - maybeVerifyShareGroupStateTopicRecordCount(persister, 2); + maybeVerifyShareGroupStateTopicRecordCount(persister); } } @@ -1365,7 +1365,7 @@ public void testAcknowledgementCommitCallbackCallsShareConsumerWakeup(String per } return exceptionThrown.get(); }, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to receive expected exception"); - maybeVerifyShareGroupStateTopicRecordCount(persister, 2); + maybeVerifyShareGroupStateTopicRecordCount(persister); } } @@ -1412,7 +1412,7 @@ public void testAcknowledgementCommitCallbackThrowsException(String persister) t } return exceptionThrown.get(); }, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to receive expected exception"); - maybeVerifyShareGroupStateTopicRecordCount(persister, 2); + maybeVerifyShareGroupStateTopicRecordCount(persister); } } @@ -1488,7 +1488,7 @@ public void testWakeupWithFetchedRecordsAvailable(String persister) { ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); assertEquals(1, records.count()); - maybeVerifyShareGroupStateTopicRecordCount(persister, 2); + maybeVerifyShareGroupStateTopicRecordCount(persister); } } @@ -1518,7 +1518,7 @@ public void testSubscriptionFollowedByTopicCreation(String persister) throws Int producer.send(record); records = shareConsumer.poll(Duration.ofMillis(5000)); assertEquals(1, records.count()); - maybeVerifyShareGroupStateTopicRecordCount(persister, 4); + maybeVerifyShareGroupStateTopicRecordCount(persister); } } @@ -1561,7 +1561,7 @@ public void testSubscriptionAndPollFollowedByTopicDeletion(String persister) thr producer.send(recordTopic2).get(); TestUtils.waitForCondition(() -> shareConsumer.poll(Duration.ofMillis(2000)).count() == 1, DEFAULT_MAX_WAIT_MS, 100L, () -> "incorrect number of records"); - maybeVerifyShareGroupStateTopicRecordCount(persister, 7); + maybeVerifyShareGroupStateTopicRecordCount(persister); } } @@ -1604,7 +1604,7 @@ public void testLsoMovementByRecordsDeletion(String persister) { messageCount = consumeMessages(new AtomicInteger(0), 0, groupId, 1, 5, true); assertEquals(0, messageCount); - maybeVerifyShareGroupStateTopicRecordCount(persister, 3); + maybeVerifyShareGroupStateTopicRecordCount(persister); } } @@ -1631,7 +1631,7 @@ public void testShareAutoOffsetResetDefaultValue(String persister) { records = shareConsumer.poll(Duration.ofMillis(5000)); // Now the next record should be consumed successfully assertEquals(1, records.count()); - maybeVerifyShareGroupStateTopicRecordCount(persister, 2); + maybeVerifyShareGroupStateTopicRecordCount(persister); } } @@ -1657,7 +1657,7 @@ public void testShareAutoOffsetResetEarliest(String persister) { records = shareConsumer.poll(Duration.ofMillis(5000)); // The next records should also be consumed successfully assertEquals(1, records.count()); - maybeVerifyShareGroupStateTopicRecordCount(persister, 3); + maybeVerifyShareGroupStateTopicRecordCount(persister); } } @@ -1682,7 +1682,7 @@ public void testShareAutoOffsetResetEarliestAfterLsoMovement(String persister) { int consumedMessageCount = consumeMessages(new AtomicInteger(0), 5, "group1", 1, 10, true); // The records returned belong to offsets 5-9. assertEquals(5, consumedMessageCount); - maybeVerifyShareGroupStateTopicRecordCount(persister, 2); + maybeVerifyShareGroupStateTopicRecordCount(persister); } } @@ -1723,7 +1723,7 @@ public void testShareAutoOffsetResetMultipleGroupsWithDifferentValue(String pers records2 = shareConsumerLatest.poll(Duration.ofMillis(5000)); // The next record should also be consumed successfully by group2 assertEquals(1, records2.count()); - maybeVerifyShareGroupStateTopicRecordCount(persister, 5); + maybeVerifyShareGroupStateTopicRecordCount(persister); } } @@ -1775,7 +1775,7 @@ public void testShareAutoOffsetResetByDuration(String persister) throws Exceptio shareConsumer.subscribe(Collections.singleton(tp.topic())); List> records = consumeRecords(shareConsumer, 3); assertEquals(3, records.size()); - maybeVerifyShareGroupStateTopicRecordCount(persister, 4); + maybeVerifyShareGroupStateTopicRecordCount(persister); } } @@ -1974,7 +1974,7 @@ private void warmup() throws InterruptedException { } } - private void maybeVerifyShareGroupStateTopicRecordCount(String persister, int messageCount) { + private void maybeVerifyShareGroupStateTopicRecordCount(String persister) { if (!persister.equals(DEFAULT_STATE_PERSISTER)) { return; } @@ -1997,11 +1997,11 @@ private void maybeVerifyShareGroupStateTopicRecordCount(String persister, int me if (msgs.count() > 0) { msgs.records(Topic.SHARE_GROUP_STATE_TOPIC_NAME).forEach(records::add); } - return records.size() == (messageCount + 2); // +2 because of extra warmup records + return records.size() > 2; // +2 because of extra warmup records }, 30000L, 200L, - () -> String.format("records found %d but expected %d", Math.max(0, records.size() - 2), messageCount) + () -> "no records produced" ); } } catch (InterruptedException e) { From cc31864e11b9deef2fb7f9ac8db18d35e2a60f13 Mon Sep 17 00:00:00 2001 From: Sushant Mahajan Date: Wed, 15 Jan 2025 14:50:26 +0530 Subject: [PATCH 7/9] remove no_op persister, change sgs record verifier name, remove test params --- .../kafka/test/api/ShareConsumerTest.java | 351 ++++++++---------- 1 file changed, 146 insertions(+), 205 deletions(-) diff --git a/core/src/test/java/kafka/test/api/ShareConsumerTest.java b/core/src/test/java/kafka/test/api/ShareConsumerTest.java index 7ed856d0c1432..e6f98b31a1034 100644 --- a/core/src/test/java/kafka/test/api/ShareConsumerTest.java +++ b/core/src/test/java/kafka/test/api/ShareConsumerTest.java @@ -61,10 +61,9 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.Timeout; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; import java.time.Duration; import java.util.ArrayList; @@ -109,17 +108,12 @@ public class ShareConsumerTest { private final TopicPartition tp2 = new TopicPartition("topic2", 0); private final TopicPartition warmupTp = new TopicPartition("warmup", 0); private static final String DEFAULT_STATE_PERSISTER = "org.apache.kafka.server.share.persister.DefaultStatePersister"; - private static final String NO_OP_PERSISTER = "org.apache.kafka.server.share.persister.NoOpShareStatePersister"; private List sgsTopicPartitions; private Admin adminClient; @BeforeEach public void createCluster(TestInfo testInfo) throws Exception { - String persisterClassName = NO_OP_PERSISTER; - if (testInfo.getDisplayName().contains(".persister=")) { - persisterClassName = testInfo.getDisplayName().split("=")[1]; - } cluster = new KafkaClusterTestKit.Builder( new TestKitNodes.Builder() .setNumBrokerNodes(1) @@ -129,7 +123,7 @@ public void createCluster(TestInfo testInfo) throws Exception { .setConfigProp("group.coordinator.rebalance.protocols", "classic,consumer,share") .setConfigProp("group.share.enable", "true") .setConfigProp("group.share.partition.max.record.locks", "10000") - .setConfigProp("group.share.persister.class.name", persisterClassName) + .setConfigProp("group.share.persister.class.name", DEFAULT_STATE_PERSISTER) .setConfigProp("group.share.record.lock.duration.ms", "15000") .setConfigProp("offsets.topic.replication.factor", "1") .setConfigProp("share.coordinator.state.topic.min.isr", "1") @@ -158,9 +152,8 @@ public void destroyCluster() throws Exception { cluster.close(); } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testPollNoSubscribeFails(String persister) { + @Test + public void testPollNoSubscribeFails() { try (KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { assertEquals(Collections.emptySet(), shareConsumer.subscription()); // "Consumer is not subscribed to any topics." @@ -168,9 +161,8 @@ public void testPollNoSubscribeFails(String persister) { } } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testSubscribeAndPollNoRecords(String persister) { + @Test + public void testSubscribeAndPollNoRecords() { alterShareAutoOffsetReset("group1", "earliest"); try (KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { Set subscription = Collections.singleton(tp.topic()); @@ -178,13 +170,12 @@ public void testSubscribeAndPollNoRecords(String persister) { assertEquals(subscription, shareConsumer.subscription()); ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(500)); assertEquals(0, records.count()); - maybeVerifyShareGroupStateTopicRecordCount(persister); + verifyShareGroupStateTopicRecordsProduced(); } } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testSubscribePollUnsubscribe(String persister) { + @Test + public void testSubscribePollUnsubscribe() { alterShareAutoOffsetReset("group1", "earliest"); try (KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { Set subscription = Collections.singleton(tp.topic()); @@ -194,13 +185,12 @@ public void testSubscribePollUnsubscribe(String persister) { shareConsumer.unsubscribe(); assertEquals(Collections.emptySet(), shareConsumer.subscription()); assertEquals(0, records.count()); - maybeVerifyShareGroupStateTopicRecordCount(persister); + verifyShareGroupStateTopicRecordsProduced(); } } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testSubscribePollSubscribe(String persister) { + @Test + public void testSubscribePollSubscribe() { alterShareAutoOffsetReset("group1", "earliest"); try (KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { Set subscription = Collections.singleton(tp.topic()); @@ -212,13 +202,12 @@ public void testSubscribePollSubscribe(String persister) { assertEquals(subscription, shareConsumer.subscription()); records = shareConsumer.poll(Duration.ofMillis(500)); assertEquals(0, records.count()); - maybeVerifyShareGroupStateTopicRecordCount(persister); + verifyShareGroupStateTopicRecordsProduced(); } } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testSubscribeUnsubscribePollFails(String persister) { + @Test + public void testSubscribeUnsubscribePollFails() { alterShareAutoOffsetReset("group1", "earliest"); try (KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { Set subscription = Collections.singleton(tp.topic()); @@ -230,13 +219,12 @@ public void testSubscribeUnsubscribePollFails(String persister) { // "Consumer is not subscribed to any topics." assertThrows(IllegalStateException.class, () -> shareConsumer.poll(Duration.ofMillis(500))); assertEquals(0, records.count()); - maybeVerifyShareGroupStateTopicRecordCount(persister); // due to leader epoch in read + verifyShareGroupStateTopicRecordsProduced(); // due to leader epoch in read } } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testSubscribeSubscribeEmptyPollFails(String persister) { + @Test + public void testSubscribeSubscribeEmptyPollFails() { alterShareAutoOffsetReset("group1", "earliest"); try (KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { Set subscription = Collections.singleton(tp.topic()); @@ -248,13 +236,12 @@ public void testSubscribeSubscribeEmptyPollFails(String persister) { // "Consumer is not subscribed to any topics." assertThrows(IllegalStateException.class, () -> shareConsumer.poll(Duration.ofMillis(500))); assertEquals(0, records.count()); - maybeVerifyShareGroupStateTopicRecordCount(persister); // due to leader epoch in read + verifyShareGroupStateTopicRecordsProduced(); // due to leader epoch in read } } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testSubscriptionAndPoll(String persister) { + @Test + public void testSubscriptionAndPoll() { alterShareAutoOffsetReset("group1", "earliest"); try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { @@ -265,13 +252,12 @@ public void testSubscriptionAndPoll(String persister) { shareConsumer.subscribe(Collections.singleton(tp.topic())); ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); assertEquals(1, records.count()); - maybeVerifyShareGroupStateTopicRecordCount(persister); + verifyShareGroupStateTopicRecordsProduced(); } } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testSubscriptionAndPollMultiple(String persister) { + @Test + public void testSubscriptionAndPollMultiple() { alterShareAutoOffsetReset("group1", "earliest"); try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { @@ -288,13 +274,12 @@ public void testSubscriptionAndPollMultiple(String persister) { producer.send(record); records = shareConsumer.poll(Duration.ofMillis(5000)); assertEquals(1, records.count()); - maybeVerifyShareGroupStateTopicRecordCount(persister); + verifyShareGroupStateTopicRecordsProduced(); } } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testAcknowledgementSentOnSubscriptionChange(String persister) throws ExecutionException, InterruptedException { + @Test + public void testAcknowledgementSentOnSubscriptionChange() throws ExecutionException, InterruptedException { alterShareAutoOffsetReset("group1", "earliest"); try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { @@ -325,13 +310,12 @@ public void testAcknowledgementSentOnSubscriptionChange(String persister) throws // Verifying if the callback was invoked without exceptions for the partitions for both topics. assertNull(partitionExceptionMap.get(tp)); assertNull(partitionExceptionMap.get(tp2)); - maybeVerifyShareGroupStateTopicRecordCount(persister); + verifyShareGroupStateTopicRecordsProduced(); } } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testAcknowledgementCommitCallbackSuccessfulAcknowledgement(String persister) throws Exception { + @Test + public void testAcknowledgementCommitCallbackSuccessfulAcknowledgement() throws Exception { alterShareAutoOffsetReset("group1", "earliest"); try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { @@ -356,13 +340,12 @@ public void testAcknowledgementCommitCallbackSuccessfulAcknowledgement(String pe // We expect null exception as the acknowledgment error code is null. assertNull(partitionExceptionMap.get(tp)); - maybeVerifyShareGroupStateTopicRecordCount(persister); + verifyShareGroupStateTopicRecordsProduced(); } } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testAcknowledgementCommitCallbackOnClose(String persister) { + @Test + public void testAcknowledgementCommitCallbackOnClose() { alterShareAutoOffsetReset("group1", "earliest"); try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { @@ -387,14 +370,13 @@ public void testAcknowledgementCommitCallbackOnClose(String persister) { // We expect null exception as the acknowledgment error code is null. assertTrue(partitionExceptionMap.containsKey(tp)); assertNull(partitionExceptionMap.get(tp)); - maybeVerifyShareGroupStateTopicRecordCount(persister); + verifyShareGroupStateTopicRecordsProduced(); } } @Flaky("KAFKA-18033") - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testAcknowledgementCommitCallbackInvalidRecordStateException(String persister) throws Exception { + @Test + public void testAcknowledgementCommitCallbackInvalidRecordStateException() throws Exception { alterShareAutoOffsetReset("group1", "earliest"); try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { @@ -447,9 +429,8 @@ public void onComplete(Map> offsetsMap, Exception ex } } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testHeaders(String persister) { + @Test + public void testHeaders() { alterShareAutoOffsetReset("group1", "earliest"); try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { @@ -470,7 +451,7 @@ public void testHeaders(String persister) { if (header != null) assertEquals("headerValue", new String(header.value())); } - maybeVerifyShareGroupStateTopicRecordCount(persister); + verifyShareGroupStateTopicRecordsProduced(); } } @@ -491,16 +472,14 @@ private void testHeadersSerializeDeserialize(Serializer serializer, Dese } } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testHeadersSerializerDeserializer(String persister) { + @Test + public void testHeadersSerializerDeserializer() { testHeadersSerializeDeserialize(new BaseConsumerTest.SerializerImpl(), new BaseConsumerTest.DeserializerImpl()); - maybeVerifyShareGroupStateTopicRecordCount(persister); + verifyShareGroupStateTopicRecordsProduced(); } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testMaxPollRecords(String persister) { + @Test + public void testMaxPollRecords() { int numRecords = 10000; int maxPollRecords = 2; @@ -528,13 +507,12 @@ public void testMaxPollRecords(String persister) { i++; } - maybeVerifyShareGroupStateTopicRecordCount(persister); + verifyShareGroupStateTopicRecordsProduced(); } } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testControlRecordsSkipped(String persister) throws Exception { + @Test + public void testControlRecordsSkipped() throws Exception { alterShareAutoOffsetReset("group1", "earliest"); try (KafkaProducer transactionalProducer = createProducer(new ByteArraySerializer(), new ByteArraySerializer(), "T1"); KafkaProducer nonTransactionalProducer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); @@ -574,13 +552,12 @@ public void testControlRecordsSkipped(String persister) throws Exception { records = shareConsumer.poll(Duration.ofMillis(500)); assertEquals(0, records.count()); - maybeVerifyShareGroupStateTopicRecordCount(persister); + verifyShareGroupStateTopicRecordsProduced(); } } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testExplicitAcknowledgeSuccess(String persister) { + @Test + public void testExplicitAcknowledgeSuccess() { alterShareAutoOffsetReset("group1", "earliest"); try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { @@ -596,13 +573,12 @@ public void testExplicitAcknowledgeSuccess(String persister) { producer.send(record); records = shareConsumer.poll(Duration.ofMillis(5000)); assertEquals(1, records.count()); - maybeVerifyShareGroupStateTopicRecordCount(persister); + verifyShareGroupStateTopicRecordsProduced(); } } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testExplicitAcknowledgeCommitSuccess(String persister) { + @Test + public void testExplicitAcknowledgeCommitSuccess() { alterShareAutoOffsetReset("group1", "earliest"); try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { @@ -620,13 +596,12 @@ public void testExplicitAcknowledgeCommitSuccess(String persister) { assertEquals(1, result.size()); records = shareConsumer.poll(Duration.ofMillis(5000)); assertEquals(1, records.count()); - maybeVerifyShareGroupStateTopicRecordCount(persister); + verifyShareGroupStateTopicRecordsProduced(); } } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testExplicitAcknowledgementCommitAsync(String persister) throws InterruptedException { + @Test + public void testExplicitAcknowledgementCommitAsync() throws InterruptedException { alterShareAutoOffsetReset("group1", "earliest"); try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); KafkaShareConsumer shareConsumer1 = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); @@ -677,13 +652,12 @@ public void testExplicitAcknowledgementCommitAsync(String persister) throws Inte }, 30000, 100L, () -> "Didn't receive call to callback"); assertNull(partitionExceptionMap1.get(tp)); - maybeVerifyShareGroupStateTopicRecordCount(persister); + verifyShareGroupStateTopicRecordsProduced(); } } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testExplicitAcknowledgementCommitAsyncPartialBatch(String persister) { + @Test + public void testExplicitAcknowledgementCommitAsyncPartialBatch() { alterShareAutoOffsetReset("group1", "earliest"); try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); KafkaShareConsumer shareConsumer1 = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { @@ -741,13 +715,12 @@ public void testExplicitAcknowledgementCommitAsyncPartialBatch(String persister) assertTrue(partitionExceptionMap.containsKey(tp)); assertNull(partitionExceptionMap.get(tp)); - maybeVerifyShareGroupStateTopicRecordCount(persister); + verifyShareGroupStateTopicRecordsProduced(); } } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testExplicitAcknowledgeReleasePollAccept(String persister) { + @Test + public void testExplicitAcknowledgeReleasePollAccept() { alterShareAutoOffsetReset("group1", "earliest"); try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { @@ -765,13 +738,12 @@ public void testExplicitAcknowledgeReleasePollAccept(String persister) { records.forEach(consumedRecord -> shareConsumer.acknowledge(consumedRecord, AcknowledgeType.ACCEPT)); records = shareConsumer.poll(Duration.ofMillis(500)); assertEquals(0, records.count()); - maybeVerifyShareGroupStateTopicRecordCount(persister); + verifyShareGroupStateTopicRecordsProduced(); } } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testExplicitAcknowledgeReleaseAccept(String persister) { + @Test + public void testExplicitAcknowledgeReleaseAccept() { alterShareAutoOffsetReset("group1", "earliest"); try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { @@ -786,13 +758,12 @@ public void testExplicitAcknowledgeReleaseAccept(String persister) { records.forEach(consumedRecord -> shareConsumer.acknowledge(consumedRecord, AcknowledgeType.ACCEPT)); records = shareConsumer.poll(Duration.ofMillis(500)); assertEquals(0, records.count()); - maybeVerifyShareGroupStateTopicRecordCount(persister); + verifyShareGroupStateTopicRecordsProduced(); } } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testExplicitAcknowledgeReleaseClose(String persister) { + @Test + public void testExplicitAcknowledgeReleaseClose() { alterShareAutoOffsetReset("group1", "earliest"); try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { @@ -805,13 +776,12 @@ public void testExplicitAcknowledgeReleaseClose(String persister) { ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); assertEquals(1, records.count()); records.forEach(consumedRecord -> shareConsumer.acknowledge(consumedRecord, AcknowledgeType.RELEASE)); - maybeVerifyShareGroupStateTopicRecordCount(persister); + verifyShareGroupStateTopicRecordsProduced(); } } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testExplicitAcknowledgeThrowsNotInBatch(String persister) { + @Test + public void testExplicitAcknowledgeThrowsNotInBatch() { alterShareAutoOffsetReset("group1", "earliest"); try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { @@ -828,13 +798,12 @@ public void testExplicitAcknowledgeThrowsNotInBatch(String persister) { records = shareConsumer.poll(Duration.ofMillis(500)); assertEquals(0, records.count()); assertThrows(IllegalStateException.class, () -> shareConsumer.acknowledge(consumedRecord)); - maybeVerifyShareGroupStateTopicRecordCount(persister); + verifyShareGroupStateTopicRecordsProduced(); } } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testImplicitAcknowledgeFailsExplicit(String persister) { + @Test + public void testImplicitAcknowledgeFailsExplicit() { alterShareAutoOffsetReset("group1", "earliest"); try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { @@ -850,13 +819,12 @@ public void testImplicitAcknowledgeFailsExplicit(String persister) { records = shareConsumer.poll(Duration.ofMillis(500)); assertEquals(0, records.count()); assertThrows(IllegalStateException.class, () -> shareConsumer.acknowledge(consumedRecord)); - maybeVerifyShareGroupStateTopicRecordCount(persister); + verifyShareGroupStateTopicRecordsProduced(); } } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testImplicitAcknowledgeCommitSync(String persister) { + @Test + public void testImplicitAcknowledgeCommitSync() { alterShareAutoOffsetReset("group1", "earliest"); try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { @@ -874,13 +842,12 @@ public void testImplicitAcknowledgeCommitSync(String persister) { assertEquals(0, result.size()); records = shareConsumer.poll(Duration.ofMillis(500)); assertEquals(0, records.count()); - maybeVerifyShareGroupStateTopicRecordCount(persister); + verifyShareGroupStateTopicRecordsProduced(); } } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testImplicitAcknowledgementCommitAsync(String persister) throws InterruptedException { + @Test + public void testImplicitAcknowledgementCommitAsync() throws InterruptedException { alterShareAutoOffsetReset("group1", "earliest"); try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { @@ -914,13 +881,12 @@ public void testImplicitAcknowledgementCommitAsync(String persister) throws Inte }, DEFAULT_MAX_WAIT_MS, 100L, () -> "Acknowledgement commit callback did not receive the response yet"); assertNull(partitionExceptionMap1.get(tp)); - maybeVerifyShareGroupStateTopicRecordCount(persister); + verifyShareGroupStateTopicRecordsProduced(); } } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testFetchRecordLargerThanMaxPartitionFetchBytes(String persister) throws Exception { + @Test + public void testFetchRecordLargerThanMaxPartitionFetchBytes() throws Exception { int maxPartitionFetchBytes = 10000; alterShareAutoOffsetReset("group1", "earliest"); @@ -937,13 +903,12 @@ public void testFetchRecordLargerThanMaxPartitionFetchBytes(String persister) th ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); assertEquals(2, records.count()); - maybeVerifyShareGroupStateTopicRecordCount(persister); + verifyShareGroupStateTopicRecordsProduced(); } } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testMultipleConsumersWithDifferentGroupIds(String persister) throws InterruptedException { + @Test + public void testMultipleConsumersWithDifferentGroupIds() throws InterruptedException { alterShareAutoOffsetReset("group1", "earliest"); alterShareAutoOffsetReset("group2", "earliest"); try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); @@ -989,13 +954,12 @@ public void testMultipleConsumersWithDifferentGroupIds(String persister) throws int records2 = shareConsumer2Records.addAndGet(shareConsumer2.poll(Duration.ofMillis(2000)).count()); return records1 == 3 && records2 == 5; }, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume records for both consumers for the last batch"); - maybeVerifyShareGroupStateTopicRecordCount(persister); + verifyShareGroupStateTopicRecordsProduced(); } } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testMultipleConsumersInGroupSequentialConsumption(String persister) { + @Test + public void testMultipleConsumersInGroupSequentialConsumption() { alterShareAutoOffsetReset("group1", "earliest"); try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); KafkaShareConsumer shareConsumer1 = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); @@ -1031,9 +995,8 @@ public void testMultipleConsumersInGroupSequentialConsumption(String persister) } @Flaky("KAFKA-18033") - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testMultipleConsumersInGroupConcurrentConsumption(String persister) + @Test + public void testMultipleConsumersInGroupConcurrentConsumption() throws InterruptedException, ExecutionException, TimeoutException { AtomicInteger totalMessagesConsumed = new AtomicInteger(0); @@ -1066,9 +1029,8 @@ public void testMultipleConsumersInGroupConcurrentConsumption(String persister) assertEquals(producerCount * messagesPerProducer, totalResult); } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testMultipleConsumersInMultipleGroupsConcurrentConsumption(String persister) + @Test + public void testMultipleConsumersInMultipleGroupsConcurrentConsumption() throws ExecutionException, InterruptedException, TimeoutException { AtomicInteger totalMessagesConsumedGroup1 = new AtomicInteger(0); AtomicInteger totalMessagesConsumedGroup2 = new AtomicInteger(0); @@ -1127,12 +1089,11 @@ public void testMultipleConsumersInMultipleGroupsConcurrentConsumption(String pe assertEquals(totalMessagesSent, totalResult2); assertEquals(totalMessagesSent, totalResult3); assertEquals(totalMessagesSent, actualMessageSent); - maybeVerifyShareGroupStateTopicRecordCount(persister); + verifyShareGroupStateTopicRecordsProduced(); } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testConsumerCloseInGroupSequential(String persister) { + @Test + public void testConsumerCloseInGroupSequential() { alterShareAutoOffsetReset("group1", "earliest"); try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); KafkaShareConsumer shareConsumer1 = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); @@ -1174,13 +1135,12 @@ public void testConsumerCloseInGroupSequential(String persister) { } shareConsumer2.close(); assertEquals(totalMessages, consumer1MessageCount + consumer2MessageCount); - maybeVerifyShareGroupStateTopicRecordCount(persister); + verifyShareGroupStateTopicRecordsProduced(); } } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testMultipleConsumersInGroupFailureConcurrentConsumption(String persister) + @Test + public void testMultipleConsumersInGroupFailureConcurrentConsumption() throws InterruptedException, ExecutionException, TimeoutException { AtomicInteger totalMessagesConsumed = new AtomicInteger(0); @@ -1221,12 +1181,11 @@ public void testMultipleConsumersInGroupFailureConcurrentConsumption(String pers int totalSuccessResult = consumeMessagesFutures.stream().mapToInt(CompletableFuture::join).sum(); assertEquals(producerCount * messagesPerProducer, totalSuccessResult); - maybeVerifyShareGroupStateTopicRecordCount(persister); + verifyShareGroupStateTopicRecordsProduced(); } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testAcquisitionLockTimeoutOnConsumer(String persister) throws InterruptedException { + @Test + public void testAcquisitionLockTimeoutOnConsumer() throws InterruptedException { alterShareAutoOffsetReset("group1", "earliest"); try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { @@ -1281,7 +1240,7 @@ public void testAcquisitionLockTimeoutOnConsumer(String persister) throws Interr consumerRecords = shareConsumer.poll(Duration.ofMillis(1000)); assertEquals(0, consumerRecords.count()); - maybeVerifyShareGroupStateTopicRecordCount(persister); + verifyShareGroupStateTopicRecordsProduced(); } } @@ -1289,9 +1248,8 @@ public void testAcquisitionLockTimeoutOnConsumer(String persister) throws Interr * Test to verify that the acknowledgement commit callback cannot invoke methods of KafkaShareConsumer. * The exception thrown is verified in {@link TestableAcknowledgementCommitCallbackWithShareConsumer} */ - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testAcknowledgementCommitCallbackCallsShareConsumerDisallowed(String persister) { + @Test + public void testAcknowledgementCommitCallbackCallsShareConsumerDisallowed() { alterShareAutoOffsetReset("group1", "earliest"); try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { @@ -1309,7 +1267,7 @@ public void testAcknowledgementCommitCallbackCallsShareConsumerDisallowed(String // The acknowledgement commit callback will be called and the exception is thrown. // This is verified inside the onComplete() method implementation. shareConsumer.poll(Duration.ofMillis(500)); - maybeVerifyShareGroupStateTopicRecordCount(persister); + verifyShareGroupStateTopicRecordsProduced(); } } @@ -1333,9 +1291,8 @@ public void onComplete(Map> offsetsMap, Exception ex * Test to verify that the acknowledgement commit callback can invoke KafkaShareConsumer.wakeup() and it * wakes up the enclosing poll. */ - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testAcknowledgementCommitCallbackCallsShareConsumerWakeup(String persister) throws InterruptedException { + @Test + public void testAcknowledgementCommitCallbackCallsShareConsumerWakeup() throws InterruptedException { alterShareAutoOffsetReset("group1", "earliest"); try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { @@ -1365,7 +1322,7 @@ public void testAcknowledgementCommitCallbackCallsShareConsumerWakeup(String per } return exceptionThrown.get(); }, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to receive expected exception"); - maybeVerifyShareGroupStateTopicRecordCount(persister); + verifyShareGroupStateTopicRecordsProduced(); } } @@ -1386,9 +1343,8 @@ public void onComplete(Map> offsetsMap, Exception ex * Test to verify that the acknowledgement commit callback can throw an exception, and it is propagated * to the caller of poll(). */ - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testAcknowledgementCommitCallbackThrowsException(String persister) throws InterruptedException { + @Test + public void testAcknowledgementCommitCallbackThrowsException() throws InterruptedException { alterShareAutoOffsetReset("group1", "earliest"); try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { @@ -1412,7 +1368,7 @@ public void testAcknowledgementCommitCallbackThrowsException(String persister) t } return exceptionThrown.get(); }, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to receive expected exception"); - maybeVerifyShareGroupStateTopicRecordCount(persister); + verifyShareGroupStateTopicRecordsProduced(); } } @@ -1427,9 +1383,8 @@ public void onComplete(Map> offsetsMap, Exception ex * Test to verify that calling Thread.interrupt() before KafkaShareConsumer.poll(Duration) * causes it to throw InterruptException */ - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testPollThrowsInterruptExceptionIfInterrupted(String persister) { + @Test + public void testPollThrowsInterruptExceptionIfInterrupted() { alterShareAutoOffsetReset("group1", "earliest"); try (KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { @@ -1452,9 +1407,8 @@ public void testPollThrowsInterruptExceptionIfInterrupted(String persister) { * Test to verify that InvalidTopicException is thrown if the consumer subscribes * to an invalid topic. */ - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testSubscribeOnInvalidTopicThrowsInvalidTopicException(String persister) { + @Test + public void testSubscribeOnInvalidTopicThrowsInvalidTopicException() { alterShareAutoOffsetReset("group1", "earliest"); try (KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { @@ -1470,9 +1424,8 @@ public void testSubscribeOnInvalidTopicThrowsInvalidTopicException(String persis * Test to ensure that a wakeup when records are buffered doesn't prevent the records * being returned on the next poll. */ - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testWakeupWithFetchedRecordsAvailable(String persister) { + @Test + public void testWakeupWithFetchedRecordsAvailable() { alterShareAutoOffsetReset("group1", "earliest"); try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { @@ -1488,13 +1441,12 @@ public void testWakeupWithFetchedRecordsAvailable(String persister) { ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); assertEquals(1, records.count()); - maybeVerifyShareGroupStateTopicRecordCount(persister); + verifyShareGroupStateTopicRecordsProduced(); } } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testSubscriptionFollowedByTopicCreation(String persister) throws InterruptedException { + @Test + public void testSubscriptionFollowedByTopicCreation() throws InterruptedException { alterShareAutoOffsetReset("group1", "earliest"); try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { @@ -1518,13 +1470,12 @@ public void testSubscriptionFollowedByTopicCreation(String persister) throws Int producer.send(record); records = shareConsumer.poll(Duration.ofMillis(5000)); assertEquals(1, records.count()); - maybeVerifyShareGroupStateTopicRecordCount(persister); + verifyShareGroupStateTopicRecordsProduced(); } } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testSubscriptionAndPollFollowedByTopicDeletion(String persister) throws InterruptedException, ExecutionException { + @Test + public void testSubscriptionAndPollFollowedByTopicDeletion() throws InterruptedException, ExecutionException { String topic1 = "bar"; String topic2 = "baz"; createTopic(topic1); @@ -1561,13 +1512,12 @@ public void testSubscriptionAndPollFollowedByTopicDeletion(String persister) thr producer.send(recordTopic2).get(); TestUtils.waitForCondition(() -> shareConsumer.poll(Duration.ofMillis(2000)).count() == 1, DEFAULT_MAX_WAIT_MS, 100L, () -> "incorrect number of records"); - maybeVerifyShareGroupStateTopicRecordCount(persister); + verifyShareGroupStateTopicRecordsProduced(); } } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testLsoMovementByRecordsDeletion(String persister) { + @Test + public void testLsoMovementByRecordsDeletion() { String groupId = "group1"; alterShareAutoOffsetReset(groupId, "earliest"); @@ -1604,13 +1554,12 @@ public void testLsoMovementByRecordsDeletion(String persister) { messageCount = consumeMessages(new AtomicInteger(0), 0, groupId, 1, 5, true); assertEquals(0, messageCount); - maybeVerifyShareGroupStateTopicRecordCount(persister); + verifyShareGroupStateTopicRecordsProduced(); } } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testShareAutoOffsetResetDefaultValue(String persister) { + @Test + public void testShareAutoOffsetResetDefaultValue() { try (KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer())) { @@ -1631,13 +1580,12 @@ public void testShareAutoOffsetResetDefaultValue(String persister) { records = shareConsumer.poll(Duration.ofMillis(5000)); // Now the next record should be consumed successfully assertEquals(1, records.count()); - maybeVerifyShareGroupStateTopicRecordCount(persister); + verifyShareGroupStateTopicRecordsProduced(); } } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testShareAutoOffsetResetEarliest(String persister) { + @Test + public void testShareAutoOffsetResetEarliest() { alterShareAutoOffsetReset("group1", "earliest"); try (KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer())) { @@ -1657,13 +1605,12 @@ public void testShareAutoOffsetResetEarliest(String persister) { records = shareConsumer.poll(Duration.ofMillis(5000)); // The next records should also be consumed successfully assertEquals(1, records.count()); - maybeVerifyShareGroupStateTopicRecordCount(persister); + verifyShareGroupStateTopicRecordsProduced(); } } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testShareAutoOffsetResetEarliestAfterLsoMovement(String persister) { + @Test + public void testShareAutoOffsetResetEarliestAfterLsoMovement() { alterShareAutoOffsetReset("group1", "earliest"); try (KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer())) { @@ -1682,13 +1629,12 @@ public void testShareAutoOffsetResetEarliestAfterLsoMovement(String persister) { int consumedMessageCount = consumeMessages(new AtomicInteger(0), 5, "group1", 1, 10, true); // The records returned belong to offsets 5-9. assertEquals(5, consumedMessageCount); - maybeVerifyShareGroupStateTopicRecordCount(persister); + verifyShareGroupStateTopicRecordsProduced(); } } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testShareAutoOffsetResetMultipleGroupsWithDifferentValue(String persister) { + @Test + public void testShareAutoOffsetResetMultipleGroupsWithDifferentValue() { alterShareAutoOffsetReset("group1", "earliest"); alterShareAutoOffsetReset("group2", "latest"); try (KafkaShareConsumer shareConsumerEarliest = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); @@ -1723,13 +1669,12 @@ public void testShareAutoOffsetResetMultipleGroupsWithDifferentValue(String pers records2 = shareConsumerLatest.poll(Duration.ofMillis(5000)); // The next record should also be consumed successfully by group2 assertEquals(1, records2.count()); - maybeVerifyShareGroupStateTopicRecordCount(persister); + verifyShareGroupStateTopicRecordsProduced(); } } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testShareAutoOffsetResetByDuration(String persister) throws Exception { + @Test + public void testShareAutoOffsetResetByDuration() throws Exception { // Set auto offset reset to 1 hour before current time alterShareAutoOffsetReset("group1", "by_duration:PT1H"); @@ -1775,13 +1720,12 @@ public void testShareAutoOffsetResetByDuration(String persister) throws Exceptio shareConsumer.subscribe(Collections.singleton(tp.topic())); List> records = consumeRecords(shareConsumer, 3); assertEquals(3, records.size()); - maybeVerifyShareGroupStateTopicRecordCount(persister); + verifyShareGroupStateTopicRecordsProduced(); } } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testShareAutoOffsetResetByDurationInvalidFormat(String persister) throws Exception { + @Test + public void testShareAutoOffsetResetByDurationInvalidFormat() throws Exception { // Test invalid duration format ConfigResource configResource = new ConfigResource(ConfigResource.Type.GROUP, "group1"); Map> alterEntries = new HashMap<>(); @@ -1974,10 +1918,7 @@ private void warmup() throws InterruptedException { } } - private void maybeVerifyShareGroupStateTopicRecordCount(String persister) { - if (!persister.equals(DEFAULT_STATE_PERSISTER)) { - return; - } + private void verifyShareGroupStateTopicRecordsProduced() { try { TestUtils.waitForCondition(() -> !cluster.brokers().get(0).metadataCache().getAliveBrokerNodes(new ListenerName("EXTERNAL")).isEmpty(), From 4d7b6fc3e1d8cc7a90c741e0a486a1b91cb3b394 Mon Sep 17 00:00:00 2001 From: Sushant Mahajan Date: Wed, 15 Jan 2025 15:09:08 +0530 Subject: [PATCH 8/9] inc review comments --- core/src/test/java/kafka/test/api/ShareConsumerTest.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/core/src/test/java/kafka/test/api/ShareConsumerTest.java b/core/src/test/java/kafka/test/api/ShareConsumerTest.java index e6f98b31a1034..4edb1f5174d51 100644 --- a/core/src/test/java/kafka/test/api/ShareConsumerTest.java +++ b/core/src/test/java/kafka/test/api/ShareConsumerTest.java @@ -78,7 +78,6 @@ import java.util.Optional; import java.util.Properties; import java.util.Set; -import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -107,7 +106,6 @@ public class ShareConsumerTest { private final TopicPartition tp = new TopicPartition("topic", 0); private final TopicPartition tp2 = new TopicPartition("topic2", 0); private final TopicPartition warmupTp = new TopicPartition("warmup", 0); - private static final String DEFAULT_STATE_PERSISTER = "org.apache.kafka.server.share.persister.DefaultStatePersister"; private List sgsTopicPartitions; private Admin adminClient; @@ -123,7 +121,6 @@ public void createCluster(TestInfo testInfo) throws Exception { .setConfigProp("group.coordinator.rebalance.protocols", "classic,consumer,share") .setConfigProp("group.share.enable", "true") .setConfigProp("group.share.partition.max.record.locks", "10000") - .setConfigProp("group.share.persister.class.name", DEFAULT_STATE_PERSISTER) .setConfigProp("group.share.record.lock.duration.ms", "15000") .setConfigProp("offsets.topic.replication.factor", "1") .setConfigProp("share.coordinator.state.topic.min.isr", "1") @@ -1925,7 +1922,6 @@ private void verifyShareGroupStateTopicRecordsProduced() { DEFAULT_MAX_WAIT_MS, 100L, () -> "cache not up yet"); Map consumerConfigs = new HashMap<>(); consumerConfigs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()); - consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); consumerConfigs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); consumerConfigs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); From cd5443ebd6a5a74b684cab9be80e9abf9b139ff7 Mon Sep 17 00:00:00 2001 From: Sushant Mahajan Date: Wed, 15 Jan 2025 19:16:03 +0530 Subject: [PATCH 9/9] remove cache wait code in SGS topic record verification --- .../test/java/kafka/test/api/ShareConsumerTest.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/core/src/test/java/kafka/test/api/ShareConsumerTest.java b/core/src/test/java/kafka/test/api/ShareConsumerTest.java index 4edb1f5174d51..237fe34bbe701 100644 --- a/core/src/test/java/kafka/test/api/ShareConsumerTest.java +++ b/core/src/test/java/kafka/test/api/ShareConsumerTest.java @@ -1897,9 +1897,7 @@ private KafkaShareConsumer createShareConsumer(Deserializer keyD private void warmup() throws InterruptedException { createTopic(warmupTp.topic()); - TestUtils.waitForCondition(() -> - !cluster.brokers().get(0).metadataCache().getAliveBrokerNodes(new ListenerName("EXTERNAL")).isEmpty(), - DEFAULT_MAX_WAIT_MS, 100L, () -> "cache not up yet"); + waitForMetadataCache(); ProducerRecord record = new ProducerRecord<>(warmupTp.topic(), warmupTp.partition(), null, "key".getBytes(), "value".getBytes()); Set subscription = Collections.singleton(warmupTp.topic()); alterShareAutoOffsetReset("warmupgroup1", "earliest"); @@ -1915,11 +1913,14 @@ private void warmup() throws InterruptedException { } } + private void waitForMetadataCache() throws InterruptedException { + TestUtils.waitForCondition(() -> + !cluster.brokers().get(0).metadataCache().getAliveBrokerNodes(new ListenerName("EXTERNAL")).isEmpty(), + DEFAULT_MAX_WAIT_MS, 100L, () -> "cache not up yet"); + } + private void verifyShareGroupStateTopicRecordsProduced() { try { - TestUtils.waitForCondition(() -> - !cluster.brokers().get(0).metadataCache().getAliveBrokerNodes(new ListenerName("EXTERNAL")).isEmpty(), - DEFAULT_MAX_WAIT_MS, 100L, () -> "cache not up yet"); Map consumerConfigs = new HashMap<>(); consumerConfigs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()); consumerConfigs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());