From 202a950335f8dd96ce9319f231f53356460f860e Mon Sep 17 00:00:00 2001 From: Kuan-Po Tseng Date: Wed, 26 Jun 2024 05:04:06 +0000 Subject: [PATCH 1/8] KAFKA-16909 Refactor GroupCoordinatorConfig with AbstractConfig --- .../scala/kafka/server/BrokerServer.scala | 21 +-- .../group/GroupCoordinatorConfig.java | 121 ++++++++------- .../group/GroupCoordinatorService.java | 38 ++--- .../group/GroupCoordinatorShard.java | 24 +-- .../group/OffsetMetadataManager.java | 4 +- .../group/GroupCoordinatorConfigTest.java | 139 ++++++++++-------- .../group/GroupCoordinatorServiceTest.java | 23 +-- .../group/OffsetMetadataManagerTest.java | 21 +-- 8 files changed, 189 insertions(+), 202 deletions(-) diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 29f3a5cc0b086..2914880409d3b 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -568,26 +568,7 @@ class BrokerServer( if (config.isNewGroupCoordinatorEnabled) { val time = Time.SYSTEM val serde = new CoordinatorRecordSerde - val groupCoordinatorConfig = new GroupCoordinatorConfig( - config.groupCoordinatorNumThreads, - config.groupCoordinatorAppendLingerMs, - config.consumerGroupSessionTimeoutMs, - config.consumerGroupHeartbeatIntervalMs, - config.consumerGroupMaxSize, - config.consumerGroupAssignors, - config.offsetsTopicSegmentBytes, - config.offsetMetadataMaxSize, - config.groupMaxSize, - config.groupInitialRebalanceDelay, - GroupCoordinatorConfig.CLASSIC_GROUP_NEW_MEMBER_JOIN_TIMEOUT_MS, - config.groupMinSessionTimeoutMs, - config.groupMaxSessionTimeoutMs, - config.offsetsRetentionCheckIntervalMs, - config.offsetsRetentionMinutes * 60 * 1000L, - config.offsetCommitTimeoutMs, - config.consumerGroupMigrationPolicy, - config.offsetsTopicCompressionType - ) + val groupCoordinatorConfig = new GroupCoordinatorConfig(config) val timer = new SystemTimerReaper( "group-coordinator-reaper", new SystemTimer("group-coordinator") diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java index 9e2bbb4f726cf..ed378c0c24fa0 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.coordinator.group; +import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.utils.Utils; @@ -26,6 +27,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.stream.Collectors; import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH; @@ -211,77 +213,111 @@ public class GroupCoordinatorConfig { */ public static final int CLASSIC_GROUP_NEW_MEMBER_JOIN_TIMEOUT_MS = 5 * 60 * 1000; + private final AbstractConfig config; + + public GroupCoordinatorConfig(AbstractConfig config) { + this.config = config; + } + /** * The number of threads or event loops running. */ - public final int numThreads; + public int numThreads() { + return config.getInt(GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG); + } /** * The duration in milliseconds that the coordinator will wait for writes to * accumulate before flushing them to disk. */ - public final int appendLingerMs; + public int appendLingerMs() { + return config.getInt(GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG); + } /** * The consumer group session timeout in milliseconds. */ - public final int consumerGroupSessionTimeoutMs; + public int consumerGroupSessionTimeoutMs() { + return config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG); + } /** * The consumer group heartbeat interval in milliseconds. */ - public final int consumerGroupHeartbeatIntervalMs; + public int consumerGroupHeartbeatIntervalMs() { + return config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG); + } /** * The consumer group maximum size. */ - public final int consumerGroupMaxSize; + public int consumerGroupMaxSize() { + return config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SIZE_CONFIG); + } /** * The consumer group assignors. */ - public final List consumerGroupAssignors; + public List consumerGroupAssignors() { + return config.getConfiguredInstances(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, ConsumerGroupPartitionAssignor.class); + } /** * The offsets topic segment bytes should be kept relatively small to facilitate faster * log compaction and faster offset loads. */ - public final int offsetsTopicSegmentBytes; + public int offsetsTopicSegmentBytes() { + return config.getInt(GroupCoordinatorConfig.OFFSETS_TOPIC_SEGMENT_BYTES_CONFIG); + } /** * The maximum size for a metadata entry associated with an offset commit. */ - public final int offsetMetadataMaxSize; + public int offsetMetadataMaxSize() { + return config.getInt(GroupCoordinatorConfig.OFFSET_METADATA_MAX_SIZE_CONFIG); + } /** * The classic group maximum size. */ - public final int classicGroupMaxSize; + public int classicGroupMaxSize() { + return config.getInt(GroupCoordinatorConfig.GROUP_MAX_SIZE_CONFIG); + } /** * The delay in milliseconds introduced for the first rebalance of a classic group. */ - public final int classicGroupInitialRebalanceDelayMs; + public int classicGroupInitialRebalanceDelayMs() { + return config.getInt(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG); + } /** * The timeout used to wait for a new member in milliseconds. */ - public final int classicGroupNewMemberJoinTimeoutMs; + public int classicGroupNewMemberJoinTimeoutMs() { + return CLASSIC_GROUP_NEW_MEMBER_JOIN_TIMEOUT_MS; + } /** * The classic group minimum session timeout. */ - public final int classicGroupMinSessionTimeoutMs; + public int classicGroupMinSessionTimeoutMs() { + return config.getInt(GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG); + } /** * The classic group maximum session timeout. */ - public final int classicGroupMaxSessionTimeoutMs; + public int classicGroupMaxSessionTimeoutMs() { + return config.getInt(GroupCoordinatorConfig.GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG); + } /** * Frequency at which to check for expired offsets. */ - public final long offsetsRetentionCheckIntervalMs; + public long offsetsRetentionCheckIntervalMs() { + return config.getLong(GroupCoordinatorConfig.OFFSETS_RETENTION_CHECK_INTERVAL_MS_CONFIG); + } /** * For subscribed consumers, committed offset of a specific partition will be expired and discarded when: @@ -297,61 +333,32 @@ public class GroupCoordinatorConfig { * Also, when a topic is deleted via the delete-topic request, upon propagated metadata update any group's * committed offsets for that topic will also be deleted without extra retention period. */ - public final long offsetsRetentionMs; + public long offsetsRetentionMs() { + return config.getInt(GroupCoordinatorConfig.OFFSETS_RETENTION_MINUTES_CONFIG) * 60L * 1000L; + } /** * Offset commit will be delayed until all replicas for the offsets topic receive the commit * or this timeout is reached */ - public final int offsetCommitTimeoutMs; + public int offsetCommitTimeoutMs() { + return config.getInt(GroupCoordinatorConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG); + } /** * The config indicating whether group protocol upgrade/downgrade are allowed. */ - public final ConsumerGroupMigrationPolicy consumerGroupMigrationPolicy; + public ConsumerGroupMigrationPolicy consumerGroupMigrationPolicy() { + return ConsumerGroupMigrationPolicy.parse( + config.getString(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG)); + } /** * The compression type used to compress records in batches. */ - public final CompressionType compressionType; - - public GroupCoordinatorConfig( - int numThreads, - int appendLingerMs, - int consumerGroupSessionTimeoutMs, - int consumerGroupHeartbeatIntervalMs, - int consumerGroupMaxSize, - List consumerGroupAssignors, - int offsetsTopicSegmentBytes, - int offsetMetadataMaxSize, - int classicGroupMaxSize, - int classicGroupInitialRebalanceDelayMs, - int classicGroupNewMemberJoinTimeoutMs, - int classicGroupMinSessionTimeoutMs, - int classicGroupMaxSessionTimeoutMs, - long offsetsRetentionCheckIntervalMs, - long offsetsRetentionMs, - int offsetCommitTimeoutMs, - ConsumerGroupMigrationPolicy consumerGroupMigrationPolicy, - CompressionType compressionType - ) { - this.numThreads = numThreads; - this.appendLingerMs = appendLingerMs; - this.consumerGroupSessionTimeoutMs = consumerGroupSessionTimeoutMs; - this.consumerGroupHeartbeatIntervalMs = consumerGroupHeartbeatIntervalMs; - this.consumerGroupMaxSize = consumerGroupMaxSize; - this.consumerGroupAssignors = consumerGroupAssignors; - this.offsetsTopicSegmentBytes = offsetsTopicSegmentBytes; - this.offsetMetadataMaxSize = offsetMetadataMaxSize; - this.classicGroupMaxSize = classicGroupMaxSize; - this.classicGroupInitialRebalanceDelayMs = classicGroupInitialRebalanceDelayMs; - this.classicGroupNewMemberJoinTimeoutMs = classicGroupNewMemberJoinTimeoutMs; - this.classicGroupMinSessionTimeoutMs = classicGroupMinSessionTimeoutMs; - this.classicGroupMaxSessionTimeoutMs = classicGroupMaxSessionTimeoutMs; - this.offsetsRetentionCheckIntervalMs = offsetsRetentionCheckIntervalMs; - this.offsetsRetentionMs = offsetsRetentionMs; - this.offsetCommitTimeoutMs = offsetCommitTimeoutMs; - this.consumerGroupMigrationPolicy = consumerGroupMigrationPolicy; - this.compressionType = compressionType; + public CompressionType compressionType() { + return Optional.ofNullable(config.getInt(GroupCoordinatorConfig.OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG)) + .map(CompressionType::forId) + .orElse(null); } } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java index e0528f502afad..24d75e320fd11 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java @@ -168,7 +168,7 @@ public GroupCoordinatorService build() { CoordinatorEventProcessor processor = new MultiThreadedEventProcessor( logContext, "group-coordinator-event-processor-", - config.numThreads, + config.numThreads(), time, coordinatorRuntimeMetrics ); @@ -183,12 +183,12 @@ public GroupCoordinatorService build() { .withPartitionWriter(writer) .withLoader(loader) .withCoordinatorShardBuilderSupplier(supplier) - .withDefaultWriteTimeOut(Duration.ofMillis(config.offsetCommitTimeoutMs)) + .withDefaultWriteTimeOut(Duration.ofMillis(config.offsetCommitTimeoutMs())) .withCoordinatorRuntimeMetrics(coordinatorRuntimeMetrics) .withCoordinatorMetrics(groupCoordinatorMetrics) .withSerializer(new CoordinatorRecordSerde()) - .withCompression(Compression.of(config.compressionType).build()) - .withAppendLingerMs(config.appendLingerMs) + .withCompression(Compression.of(config.compressionType()).build()) + .withAppendLingerMs(config.appendLingerMs()) .build(); return new GroupCoordinatorService( @@ -296,7 +296,7 @@ public CompletableFuture consumerGroupHeartb return runtime.scheduleWriteOperation( "consumer-group-heartbeat", topicPartitionFor(request.groupId()), - Duration.ofMillis(config.offsetCommitTimeoutMs), + Duration.ofMillis(config.offsetCommitTimeoutMs()), coordinator -> coordinator.consumerGroupHeartbeat(context, request) ).exceptionally(exception -> handleOperationException( "consumer-group-heartbeat", @@ -331,8 +331,8 @@ public CompletableFuture joinGroup( ); } - if (request.sessionTimeoutMs() < config.classicGroupMinSessionTimeoutMs || - request.sessionTimeoutMs() > config.classicGroupMaxSessionTimeoutMs) { + if (request.sessionTimeoutMs() < config.classicGroupMinSessionTimeoutMs() || + request.sessionTimeoutMs() > config.classicGroupMaxSessionTimeoutMs()) { return CompletableFuture.completedFuture(new JoinGroupResponseData() .setMemberId(request.memberId()) .setErrorCode(Errors.INVALID_SESSION_TIMEOUT.code()) @@ -344,7 +344,7 @@ public CompletableFuture joinGroup( runtime.scheduleWriteOperation( "classic-group-join", topicPartitionFor(request.groupId()), - Duration.ofMillis(config.offsetCommitTimeoutMs), + Duration.ofMillis(config.offsetCommitTimeoutMs()), coordinator -> coordinator.classicGroupJoin(context, request, responseFuture) ).exceptionally(exception -> { if (!responseFuture.isDone()) { @@ -387,7 +387,7 @@ public CompletableFuture syncGroup( runtime.scheduleWriteOperation( "classic-group-sync", topicPartitionFor(request.groupId()), - Duration.ofMillis(config.offsetCommitTimeoutMs), + Duration.ofMillis(config.offsetCommitTimeoutMs()), coordinator -> coordinator.classicGroupSync(context, request, responseFuture) ).exceptionally(exception -> { if (!responseFuture.isDone()) { @@ -427,7 +427,7 @@ public CompletableFuture heartbeat( return runtime.scheduleWriteOperation( "classic-group-heartbeat", topicPartitionFor(request.groupId()), - Duration.ofMillis(config.offsetCommitTimeoutMs), + Duration.ofMillis(config.offsetCommitTimeoutMs()), coordinator -> coordinator.classicGroupHeartbeat(context, request) ).exceptionally(exception -> handleOperationException( "classic-group-heartbeat", @@ -469,7 +469,7 @@ public CompletableFuture leaveGroup( return runtime.scheduleWriteOperation( "classic-group-leave", topicPartitionFor(request.groupId()), - Duration.ofMillis(config.offsetCommitTimeoutMs), + Duration.ofMillis(config.offsetCommitTimeoutMs()), coordinator -> coordinator.classicGroupLeave(context, request) ).exceptionally(exception -> handleOperationException( "classic-group-leave", @@ -682,7 +682,7 @@ public CompletableFuture coordinator.deleteGroups(context, groupList) ).exceptionally(exception -> handleOperationException( "delete-groups", @@ -736,7 +736,7 @@ public CompletableFuture fetch return runtime.scheduleWriteOperation( "fetch-offsets", topicPartitionFor(request.groupId()), - Duration.ofMillis(config.offsetCommitTimeoutMs), + Duration.ofMillis(config.offsetCommitTimeoutMs()), coordinator -> new CoordinatorResult<>( Collections.emptyList(), coordinator.fetchOffsets(request, Long.MAX_VALUE) @@ -787,7 +787,7 @@ public CompletableFuture fetch return runtime.scheduleWriteOperation( "fetch-all-offsets", topicPartitionFor(request.groupId()), - Duration.ofMillis(config.offsetCommitTimeoutMs), + Duration.ofMillis(config.offsetCommitTimeoutMs()), coordinator -> new CoordinatorResult<>( Collections.emptyList(), coordinator.fetchAllOffsets(request, Long.MAX_VALUE) @@ -829,7 +829,7 @@ public CompletableFuture commitOffsets( return runtime.scheduleWriteOperation( "commit-offset", topicPartitionFor(request.groupId()), - Duration.ofMillis(config.offsetCommitTimeoutMs), + Duration.ofMillis(config.offsetCommitTimeoutMs()), coordinator -> coordinator.commitOffset(context, request) ).exceptionally(exception -> handleOperationException( "commit-offset", @@ -868,7 +868,7 @@ public CompletableFuture commitTransactionalOffsets request.transactionalId(), request.producerId(), request.producerEpoch(), - Duration.ofMillis(config.offsetCommitTimeoutMs), + Duration.ofMillis(config.offsetCommitTimeoutMs()), coordinator -> coordinator.commitTransactionalOffset(context, request), context.apiVersion() ).exceptionally(exception -> handleOperationException( @@ -903,7 +903,7 @@ public CompletableFuture deleteOffsets( return runtime.scheduleWriteOperation( "delete-offsets", topicPartitionFor(request.groupId()), - Duration.ofMillis(config.offsetCommitTimeoutMs), + Duration.ofMillis(config.offsetCommitTimeoutMs()), coordinator -> coordinator.deleteOffsets(context, request) ).exceptionally(exception -> handleOperationException( "delete-offsets", @@ -973,7 +973,7 @@ public void onPartitionsDeleted( FutureUtils.mapExceptionally( runtime.scheduleWriteAllOperation( "on-partition-deleted", - Duration.ofMillis(config.offsetCommitTimeoutMs), + Duration.ofMillis(config.offsetCommitTimeoutMs()), coordinator -> coordinator.onPartitionsDeleted(topicPartitions) ), exception -> { @@ -1036,7 +1036,7 @@ public Properties groupMetadataTopicConfigs() { Properties properties = new Properties(); properties.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT); properties.put(TopicConfig.COMPRESSION_TYPE_CONFIG, BrokerCompressionType.PRODUCER.name); - properties.put(TopicConfig.SEGMENT_BYTES_CONFIG, String.valueOf(config.offsetsTopicSegmentBytes)); + properties.put(TopicConfig.SEGMENT_BYTES_CONFIG, String.valueOf(config.offsetsTopicSegmentBytes())); return properties; } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java index 329feca2d3194..56ea19323f906 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java @@ -181,16 +181,16 @@ public GroupCoordinatorShard build() { .withSnapshotRegistry(snapshotRegistry) .withTime(time) .withTimer(timer) - .withConsumerGroupAssignors(config.consumerGroupAssignors) - .withConsumerGroupMaxSize(config.consumerGroupMaxSize) - .withConsumerGroupSessionTimeout(config.consumerGroupSessionTimeoutMs) - .withConsumerGroupHeartbeatInterval(config.consumerGroupHeartbeatIntervalMs) - .withClassicGroupMaxSize(config.classicGroupMaxSize) - .withClassicGroupInitialRebalanceDelayMs(config.classicGroupInitialRebalanceDelayMs) - .withClassicGroupNewMemberJoinTimeoutMs(config.classicGroupNewMemberJoinTimeoutMs) - .withClassicGroupMinSessionTimeoutMs(config.classicGroupMinSessionTimeoutMs) - .withClassicGroupMaxSessionTimeoutMs(config.classicGroupMaxSessionTimeoutMs) - .withConsumerGroupMigrationPolicy(config.consumerGroupMigrationPolicy) + .withConsumerGroupAssignors(config.consumerGroupAssignors()) + .withConsumerGroupMaxSize(config.consumerGroupMaxSize()) + .withConsumerGroupSessionTimeout(config.consumerGroupSessionTimeoutMs()) + .withConsumerGroupHeartbeatInterval(config.consumerGroupHeartbeatIntervalMs()) + .withClassicGroupMaxSize(config.classicGroupMaxSize()) + .withClassicGroupInitialRebalanceDelayMs(config.classicGroupInitialRebalanceDelayMs()) + .withClassicGroupNewMemberJoinTimeoutMs(config.classicGroupNewMemberJoinTimeoutMs()) + .withClassicGroupMinSessionTimeoutMs(config.classicGroupMinSessionTimeoutMs()) + .withClassicGroupMaxSessionTimeoutMs(config.classicGroupMaxSessionTimeoutMs()) + .withConsumerGroupMigrationPolicy(config.consumerGroupMigrationPolicy()) .withGroupCoordinatorMetricsShard(metricsShard) .build(); @@ -595,10 +595,10 @@ public CoordinatorResult cleanupGroupMetadata() { private void scheduleGroupMetadataExpiration() { timer.schedule( GROUP_EXPIRATION_KEY, - config.offsetsRetentionCheckIntervalMs, + config.offsetsRetentionCheckIntervalMs(), TimeUnit.MILLISECONDS, true, - config.offsetsRetentionCheckIntervalMs, + config.offsetsRetentionCheckIntervalMs(), this::cleanupGroupMetadata ); } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java index 600ed165f6613..dad69db30b97a 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java @@ -412,7 +412,7 @@ private Group validateOffsetDelete( * @return True if the committed metadata is invalid; False otherwise. */ private boolean isMetadataInvalid(String metadata) { - return metadata != null && metadata.length() > config.offsetMetadataMaxSize; + return metadata != null && metadata.length() > config.offsetMetadataMaxSize(); } /** @@ -881,7 +881,7 @@ public boolean cleanupExpiredOffsets(String groupId, List rec if (!group.isSubscribedToTopic(topic)) { partitions.forEach((partition, offsetAndMetadata) -> { // We don't expire the offset yet if there is a pending transactional offset for the partition. - if (condition.isOffsetExpired(offsetAndMetadata, currentTimestampMs, config.offsetsRetentionMs) && + if (condition.isOffsetExpired(offsetAndMetadata, currentTimestampMs, config.offsetsRetentionMs()) && !hasPendingTransactionalOffsets(groupId, topic, partition)) { expiredPartitions.add(appendOffsetCommitTombstone(groupId, topic, partition, records).toString()); log.debug("[GroupId {}] Expired offset for partition={}-{}", groupId, topic, partition); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java index 8a9947e76bac1..65cf735eaaeca 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java @@ -16,84 +16,103 @@ */ package org.apache.kafka.coordinator.group; +import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.record.CompressionType; -import org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor; +import org.apache.kafka.common.utils.Utils; import org.apache.kafka.coordinator.group.assignor.RangeAssignor; import org.junit.jupiter.api.Test; +import java.time.Duration; +import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import static org.junit.jupiter.api.Assertions.assertEquals; public class GroupCoordinatorConfigTest { @Test public void testConfigs() { - ConsumerGroupPartitionAssignor assignor = new RangeAssignor(); - GroupCoordinatorConfig config = new GroupCoordinatorConfig( - 10, - 10, - 30, - 10, - 55, - Collections.singletonList(assignor), - 2222, - 3333, - 60, - 3000, - 5 * 60 * 1000, - 120, - 10 * 60 * 1000, - 600000L, - 24 * 60 * 60 * 1000L, - 5000, - ConsumerGroupMigrationPolicy.DISABLED, - CompressionType.GZIP - ); + Map configs = new HashMap<>(); + configs.put(GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG, 10); + configs.put(GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG, 10); + configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, 30); + configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 10); + configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SIZE_CONFIG, 55); + configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, Collections.singletonList(RangeAssignor.class)); + configs.put(GroupCoordinatorConfig.OFFSETS_TOPIC_SEGMENT_BYTES_CONFIG, 2222); + configs.put(GroupCoordinatorConfig.OFFSET_METADATA_MAX_SIZE_CONFIG, 3333); + configs.put(GroupCoordinatorConfig.GROUP_MAX_SIZE_CONFIG, 60); + configs.put(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 3000); + configs.put(GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, 120); + configs.put(GroupCoordinatorConfig.GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, 10 * 60 * 1000); + configs.put(GroupCoordinatorConfig.OFFSETS_RETENTION_CHECK_INTERVAL_MS_CONFIG, 600000); + configs.put(GroupCoordinatorConfig.OFFSETS_RETENTION_MINUTES_CONFIG, 24 * 60 * 60 * 1000); + configs.put(GroupCoordinatorConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG, 5000); + configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, ConsumerGroupMigrationPolicy.DISABLED.name()); + configs.put(GroupCoordinatorConfig.OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG, (int) CompressionType.GZIP.id); - assertEquals(10, config.numThreads); - assertEquals(30, config.consumerGroupSessionTimeoutMs); - assertEquals(10, config.consumerGroupHeartbeatIntervalMs); - assertEquals(55, config.consumerGroupMaxSize); - assertEquals(Collections.singletonList(assignor), config.consumerGroupAssignors); - assertEquals(2222, config.offsetsTopicSegmentBytes); - assertEquals(3333, config.offsetMetadataMaxSize); - assertEquals(60, config.classicGroupMaxSize); - assertEquals(3000, config.classicGroupInitialRebalanceDelayMs); - assertEquals(5 * 60 * 1000, config.classicGroupNewMemberJoinTimeoutMs); - assertEquals(120, config.classicGroupMinSessionTimeoutMs); - assertEquals(10 * 60 * 1000, config.classicGroupMaxSessionTimeoutMs); - assertEquals(10 * 60 * 1000, config.offsetsRetentionCheckIntervalMs); - assertEquals(24 * 60 * 60 * 1000L, config.offsetsRetentionMs); - assertEquals(5000, config.offsetCommitTimeoutMs); - assertEquals(CompressionType.GZIP, config.compressionType); - assertEquals(10, config.appendLingerMs); + GroupCoordinatorConfig config = new GroupCoordinatorConfig(new GroupCoordinatorTestConfig(configs)); + + assertEquals(10, config.numThreads()); + assertEquals(30, config.consumerGroupSessionTimeoutMs()); + assertEquals(10, config.consumerGroupHeartbeatIntervalMs()); + assertEquals(55, config.consumerGroupMaxSize()); + assertEquals(1, config.consumerGroupAssignors().size()); + assertEquals(RangeAssignor.RANGE_ASSIGNOR_NAME, config.consumerGroupAssignors().get(0).name()); + assertEquals(2222, config.offsetsTopicSegmentBytes()); + assertEquals(3333, config.offsetMetadataMaxSize()); + assertEquals(60, config.classicGroupMaxSize()); + assertEquals(3000, config.classicGroupInitialRebalanceDelayMs()); + assertEquals(5 * 60 * 1000, config.classicGroupNewMemberJoinTimeoutMs()); + assertEquals(120, config.classicGroupMinSessionTimeoutMs()); + assertEquals(10 * 60 * 1000, config.classicGroupMaxSessionTimeoutMs()); + assertEquals(10 * 60 * 1000, config.offsetsRetentionCheckIntervalMs()); + assertEquals(Duration.ofMinutes(24 * 60 * 60 * 1000L).toMillis(), config.offsetsRetentionMs()); + assertEquals(5000, config.offsetCommitTimeoutMs()); + assertEquals(CompressionType.GZIP, config.compressionType()); + assertEquals(10, config.appendLingerMs()); } public static GroupCoordinatorConfig createGroupCoordinatorConfig( int offsetMetadataMaxSize, long offsetsRetentionCheckIntervalMs, - long offsetsRetentionMs + int offsetsRetentionMinutes ) { - return new GroupCoordinatorConfig( - 1, - 10, - 45, - 5, - Integer.MAX_VALUE, - Collections.singletonList(new RangeAssignor()), - 1000, - offsetMetadataMaxSize, - Integer.MAX_VALUE, - 3000, - 5 * 60 * 1000, - 120, - 10 * 5 * 1000, - offsetsRetentionCheckIntervalMs, - offsetsRetentionMs, - 5000, - ConsumerGroupMigrationPolicy.DISABLED, - CompressionType.NONE - ); + Map configs = new HashMap<>(); + configs.put(GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG, 1); + configs.put(GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG, 10); + configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, 45); + configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 5); + configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SIZE_CONFIG, Integer.MAX_VALUE); + configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, Collections.singletonList(RangeAssignor.class)); + configs.put(GroupCoordinatorConfig.OFFSETS_TOPIC_SEGMENT_BYTES_CONFIG, 1000); + configs.put(GroupCoordinatorConfig.OFFSET_METADATA_MAX_SIZE_CONFIG, offsetMetadataMaxSize); + configs.put(GroupCoordinatorConfig.GROUP_MAX_SIZE_CONFIG, Integer.MAX_VALUE); + configs.put(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 3000); + configs.put(GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, 120); + configs.put(GroupCoordinatorConfig.GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, 10 * 5 * 1000); + configs.put(GroupCoordinatorConfig.OFFSETS_RETENTION_CHECK_INTERVAL_MS_CONFIG, offsetsRetentionCheckIntervalMs); + configs.put(GroupCoordinatorConfig.OFFSETS_RETENTION_MINUTES_CONFIG, offsetsRetentionMinutes); + configs.put(GroupCoordinatorConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG, 5000); + configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, ConsumerGroupMigrationPolicy.DISABLED.name()); + configs.put(GroupCoordinatorConfig.OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG, (int) CompressionType.NONE.id); + + return new GroupCoordinatorConfig(new GroupCoordinatorTestConfig(configs)); + } + + private static class GroupCoordinatorTestConfig extends AbstractConfig { + + public GroupCoordinatorTestConfig(Map originals) { + super( + Utils.mergeConfigs(Arrays.asList( + GroupCoordinatorConfig.GROUP_COORDINATOR_CONFIG_DEF, + GroupCoordinatorConfig.NEW_GROUP_CONFIG_DEF, + GroupCoordinatorConfig.OFFSET_MANAGEMENT_CONFIG_DEF, + GroupCoordinatorConfig.CONSUMER_GROUP_CONFIG_DEF)), + originals, + true); + } } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java index 7ad0ecd16a917..24da10e324a7a 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java @@ -57,7 +57,6 @@ import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.requests.RequestContext; import org.apache.kafka.common.requests.RequestHeader; import org.apache.kafka.common.requests.TransactionResult; @@ -66,7 +65,6 @@ import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.coordinator.group.assignor.RangeAssignor; import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics; import org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime; import org.apache.kafka.server.record.BrokerCompressionType; @@ -117,26 +115,7 @@ private CoordinatorRuntime mockRuntime } private GroupCoordinatorConfig createConfig() { - return new GroupCoordinatorConfig( - 1, - 10, - 45, - 5, - Integer.MAX_VALUE, - Collections.singletonList(new RangeAssignor()), - 1000, - 4096, - Integer.MAX_VALUE, - 3000, - 5 * 60 * 1000, - 120, - 10 * 5 * 1000, - 600000L, - 24 * 60 * 1000L, - 5000, - ConsumerGroupMigrationPolicy.DISABLED, - CompressionType.NONE - ); + return GroupCoordinatorConfigTest.createGroupCoordinatorConfig(4096, 600000L, 24); } @Test diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java index dd2efb58cfdde..df0d3f84826ed 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java @@ -68,6 +68,7 @@ import org.junit.jupiter.params.provider.EnumSource; import java.net.InetAddress; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -105,12 +106,12 @@ public static class Builder { private GroupCoordinatorMetricsShard metrics = mock(GroupCoordinatorMetricsShard.class); Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) { - config = GroupCoordinatorConfigTest.createGroupCoordinatorConfig(offsetMetadataMaxSize, 60000L, 24 * 60 * 1000); + config = GroupCoordinatorConfigTest.createGroupCoordinatorConfig(offsetMetadataMaxSize, 60000L, 24 * 60); return this; } - Builder withOffsetsRetentionMs(long offsetsRetentionMs) { - config = GroupCoordinatorConfigTest.createGroupCoordinatorConfig(4096, 60000L, offsetsRetentionMs); + Builder withOffsetsRetentionMinutes(int offsetsRetentionMinutes) { + config = GroupCoordinatorConfigTest.createGroupCoordinatorConfig(4096, 60000L, offsetsRetentionMinutes); return this; } @@ -122,7 +123,7 @@ Builder withGroupMetadataManager(GroupMetadataManager groupMetadataManager) { OffsetMetadataManagerTestContext build() { if (metadataImage == null) metadataImage = MetadataImage.EMPTY; if (config == null) { - config = GroupCoordinatorConfigTest.createGroupCoordinatorConfig(4096, 60000L, 24 * 60 * 1000); + config = GroupCoordinatorConfigTest.createGroupCoordinatorConfig(4096, 60000L, 24); } if (groupMetadataManager == null) { @@ -2515,7 +2516,7 @@ public void testCleanupExpiredOffsets() { OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder() .withGroupMetadataManager(groupMetadataManager) - .withOffsetsRetentionMs(1000) + .withOffsetsRetentionMinutes(1) .build(); long commitTimestamp = context.time.milliseconds(); @@ -2524,7 +2525,7 @@ public void testCleanupExpiredOffsets() { context.commitOffset("group-id", "secondTopic", 0, 100L, 0, commitTimestamp); context.commitOffset("group-id", "secondTopic", 1, 100L, 0, commitTimestamp + 500); - context.time.sleep(1000); + context.time.sleep(Duration.ofMinutes(1).toMillis()); // firstTopic-0: group is still subscribed to firstTopic. Do not expire. // secondTopic-0: should expire as offset retention has passed. @@ -2576,7 +2577,7 @@ public void testCleanupExpiredOffsetsWithPendingTransactionalOffsets() { OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder() .withGroupMetadataManager(groupMetadataManager) - .withOffsetsRetentionMs(1000) + .withOffsetsRetentionMinutes(1) .build(); long commitTimestamp = context.time.milliseconds(); @@ -2584,7 +2585,7 @@ public void testCleanupExpiredOffsetsWithPendingTransactionalOffsets() { context.commitOffset("group-id", "foo", 0, 100L, 0, commitTimestamp); context.commitOffset(10L, "group-id", "foo", 0, 101L, 0, commitTimestamp + 500); - context.time.sleep(1000); + context.time.sleep(Duration.ofMinutes(1).toMillis()); when(groupMetadataManager.group("group-id")).thenReturn(group); when(group.offsetExpirationCondition()).thenReturn(Optional.of( @@ -3001,7 +3002,7 @@ public void testOffsetsExpiredSensor() { OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder() .withGroupMetadataManager(groupMetadataManager) - .withOffsetsRetentionMs(1000) + .withOffsetsRetentionMinutes(1) .build(); long commitTimestamp = context.time.milliseconds(); @@ -3010,7 +3011,7 @@ public void testOffsetsExpiredSensor() { context.commitOffset("group-id", "secondTopic", 0, 100L, 0, commitTimestamp); context.commitOffset("group-id", "secondTopic", 1, 100L, 0, commitTimestamp + 500); - context.time.sleep(1000); + context.time.sleep(Duration.ofMinutes(1).toMillis()); // firstTopic-0: group is still subscribed to firstTopic. Do not expire. // secondTopic-0: should expire as offset retention has passed. From a76175292abec5b7fbb6381673cd008da28ef645 Mon Sep 17 00:00:00 2001 From: Kuan-Po Tseng Date: Sat, 29 Jun 2024 20:45:07 +0800 Subject: [PATCH 2/8] Add method to return GroupCoordinatorConfig in KafkaConfig --- .../coordinator/group/GroupCoordinator.scala | 28 +++--- .../server/AutoTopicCreationManager.scala | 4 +- .../scala/kafka/server/BrokerServer.scala | 2 +- .../main/scala/kafka/server/KafkaApis.scala | 2 +- .../main/scala/kafka/server/KafkaConfig.scala | 53 +++-------- .../main/scala/kafka/server/KafkaServer.scala | 2 +- .../metadata/BrokerMetadataPublisher.scala | 2 +- .../DynamicBrokerReconfigurationTest.scala | 2 +- .../GroupCoordinatorConcurrencyTest.scala | 4 +- .../group/GroupCoordinatorTest.scala | 2 +- .../group/GroupMetadataManagerTest.scala | 20 ++-- .../integration/KafkaServerTestHarness.scala | 4 +- .../server/AutoTopicCreationManagerTest.scala | 2 +- .../unit/kafka/server/KafkaConfigTest.scala | 6 +- .../group/GroupCoordinatorConfig.java | 94 +++++++++++++++++++ 15 files changed, 145 insertions(+), 82 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala index 8f187d9ed3120..d6d1b14b8e9dc 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala @@ -1770,16 +1770,16 @@ object GroupCoordinator { @nowarn("cat=deprecation") private[group] def offsetConfig(config: KafkaConfig) = new OffsetConfig( - config.offsetMetadataMaxSize, - config.offsetsLoadBufferSize, - config.offsetsRetentionMinutes * 60L * 1000L, - config.offsetsRetentionCheckIntervalMs, - config.offsetsTopicPartitions, - config.offsetsTopicSegmentBytes, - config.offsetsTopicReplicationFactor, - config.offsetsTopicCompressionType, - config.offsetCommitTimeoutMs, - config.offsetCommitRequiredAcks + config.groupCoordinatorConfig.offsetMetadataMaxSize, + config.groupCoordinatorConfig.offsetsLoadBufferSize, + config.groupCoordinatorConfig.offsetsRetentionMs, + config.groupCoordinatorConfig.offsetsRetentionCheckIntervalMs, + config.groupCoordinatorConfig.offsetsTopicPartitions, + config.groupCoordinatorConfig.offsetsTopicSegmentBytes, + config.groupCoordinatorConfig.offsetsTopicReplicationFactor, + config.groupCoordinatorConfig.compressionType, + config.groupCoordinatorConfig.offsetCommitTimeoutMs, + config.groupCoordinatorConfig.offsetCommitRequiredAcks ) private[group] def apply( @@ -1791,10 +1791,10 @@ object GroupCoordinator { metrics: Metrics ): GroupCoordinator = { val offsetConfig = this.offsetConfig(config) - val groupConfig = GroupConfig(groupMinSessionTimeoutMs = config.groupMinSessionTimeoutMs, - groupMaxSessionTimeoutMs = config.groupMaxSessionTimeoutMs, - groupMaxSize = config.groupMaxSize, - groupInitialRebalanceDelayMs = config.groupInitialRebalanceDelay) + val groupConfig = GroupConfig(groupMinSessionTimeoutMs = config.groupCoordinatorConfig.groupMinSessionTimeoutMs, + groupMaxSessionTimeoutMs = config.groupCoordinatorConfig.groupMaxSessionTimeoutMs, + groupMaxSize = config.groupCoordinatorConfig.groupMaxSize, + groupInitialRebalanceDelayMs = config.groupCoordinatorConfig.groupInitialRebalanceDelay) val groupMetadataManager = new GroupMetadataManager(config.brokerId, config.interBrokerProtocolVersion, offsetConfig, replicaManager, time, metrics) diff --git a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala index a84e053c9a976..6d8126cb62550 100644 --- a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala +++ b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala @@ -234,8 +234,8 @@ class DefaultAutoTopicCreationManager( case GROUP_METADATA_TOPIC_NAME => new CreatableTopic() .setName(topic) - .setNumPartitions(config.offsetsTopicPartitions) - .setReplicationFactor(config.offsetsTopicReplicationFactor) + .setNumPartitions(config.groupCoordinatorConfig.offsetsTopicPartitions) + .setReplicationFactor(config.groupCoordinatorConfig.offsetsTopicReplicationFactor) .setConfigs(convertToTopicConfigCollections(groupCoordinator.groupMetadataTopicConfigs)) case TRANSACTION_STATE_TOPIC_NAME => new CreatableTopic() diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 2914880409d3b..940afe63e7242 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -577,7 +577,7 @@ class BrokerServer( time, replicaManager, serde, - config.offsetsLoadBufferSize + config.groupCoordinatorConfig.offsetsLoadBufferSize ) val writer = new CoordinatorPartitionWriter( replicaManager diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 919c64498c91f..d8a6da6e9b6c2 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -530,7 +530,7 @@ class KafkaApis(val requestChannel: RequestChannel, authorizedTopicsRequest.foreach { topic => topic.partitions.forEach { partition => val error = try { - if (partition.committedMetadata != null && partition.committedMetadata.length > config.offsetMetadataMaxSize) { + if (partition.committedMetadata != null && partition.committedMetadata.length > config.groupCoordinatorConfig.offsetMetadataMaxSize) { Errors.OFFSET_METADATA_TOO_LARGE } else { zkSupport.zkClient.setOrCreateConsumerOffset( diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index b436ec974aebc..1402e41ffee23 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -33,10 +33,8 @@ import org.apache.kafka.common.record.{CompressionType, TimestampType} import org.apache.kafka.common.security.auth.KafkaPrincipalSerde import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.utils.Utils -import org.apache.kafka.coordinator.group.ConsumerGroupMigrationPolicy import org.apache.kafka.coordinator.group.Group.GroupType import org.apache.kafka.coordinator.group.GroupCoordinatorConfig -import org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor import org.apache.kafka.coordinator.transaction.{TransactionLogConfigs, TransactionStateManagerConfigs} import org.apache.kafka.network.SocketServerConfigs import org.apache.kafka.raft.QuorumConfig @@ -234,6 +232,9 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami private val _remoteLogManagerConfig = new RemoteLogManagerConfig(this) def remoteLogManagerConfig = _remoteLogManagerConfig + private val _groupCoordinatorConfig = new GroupCoordinatorConfig(this) + def groupCoordinatorConfig: GroupCoordinatorConfig = _groupCoordinatorConfig + private def zkBooleanConfigOrSystemPropertyWithDefaultValue(propKey: String): Boolean = { // Use the system property if it exists and the Kafka config value was defaulted rather than actually provided // Need to translate any system property value from true/false (String) to true/false (Boolean) @@ -439,8 +440,6 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami val logCleanupIntervalMs = getLong(ServerLogConfigs.LOG_CLEANUP_INTERVAL_MS_CONFIG) def logCleanupPolicy = getList(ServerLogConfigs.LOG_CLEANUP_POLICY_CONFIG) - val offsetsRetentionMinutes = getInt(GroupCoordinatorConfig.OFFSETS_RETENTION_MINUTES_CONFIG) - val offsetsRetentionCheckIntervalMs = getLong(GroupCoordinatorConfig.OFFSETS_RETENTION_CHECK_INTERVAL_MS_CONFIG) def logRetentionBytes = getLong(ServerLogConfigs.LOG_RETENTION_BYTES_CONFIG) val logCleanerDedupeBufferSize = getLong(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP) val logCleanerDedupeBufferLoadFactor = getDouble(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_LOAD_FACTOR_PROP) @@ -562,12 +561,6 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami /** ********* Feature configuration ***********/ def isFeatureVersioningSupported = interBrokerProtocolVersion.isFeatureVersioningSupported - /** ********* Group coordinator configuration ***********/ - val groupMinSessionTimeoutMs = getInt(GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG) - val groupMaxSessionTimeoutMs = getInt(GroupCoordinatorConfig.GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG) - val groupInitialRebalanceDelay = getInt(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG) - val groupMaxSize = getInt(GroupCoordinatorConfig.GROUP_MAX_SIZE_CONFIG) - /** New group coordinator configs */ val groupCoordinatorRebalanceProtocols = { val protocols = getList(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG) @@ -588,19 +581,6 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami // it is explicitly set; or 2) the consumer rebalance protocol is enabled. val isNewGroupCoordinatorEnabled = getBoolean(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG) || groupCoordinatorRebalanceProtocols.contains(GroupType.CONSUMER) - val groupCoordinatorNumThreads = getInt(GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG) - val groupCoordinatorAppendLingerMs = getInt(GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG) - - /** Consumer group configs */ - val consumerGroupSessionTimeoutMs = getInt(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG) - val consumerGroupMinSessionTimeoutMs = getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG) - val consumerGroupMaxSessionTimeoutMs = getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG) - val consumerGroupHeartbeatIntervalMs = getInt(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG) - val consumerGroupMinHeartbeatIntervalMs = getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG) - val consumerGroupMaxHeartbeatIntervalMs = getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG) - val consumerGroupMaxSize = getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SIZE_CONFIG) - val consumerGroupAssignors = getConfiguredInstances(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, classOf[ConsumerGroupPartitionAssignor]) - val consumerGroupMigrationPolicy = ConsumerGroupMigrationPolicy.parse(getString(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG)) /** Share group configuration **/ val isShareGroupEnabled = getBoolean(ShareGroupConfigs.SHARE_GROUP_ENABLE_CONFIG) @@ -618,17 +598,6 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami val shareGroupMaxRecordLockDurationMs = getInt(ShareGroupConfigs.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG) val shareGroupMinRecordLockDurationMs = getInt(ShareGroupConfigs.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG) - /** ********* Offset management configuration ***********/ - val offsetMetadataMaxSize = getInt(GroupCoordinatorConfig.OFFSET_METADATA_MAX_SIZE_CONFIG) - val offsetsLoadBufferSize = getInt(GroupCoordinatorConfig.OFFSETS_LOAD_BUFFER_SIZE_CONFIG) - val offsetsTopicReplicationFactor = getShort(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG) - val offsetsTopicPartitions = getInt(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG) - val offsetCommitTimeoutMs = getInt(GroupCoordinatorConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG) - @deprecated("3.8") - val offsetCommitRequiredAcks = getShort(GroupCoordinatorConfig.OFFSET_COMMIT_REQUIRED_ACKS_CONFIG) - val offsetsTopicSegmentBytes = getInt(GroupCoordinatorConfig.OFFSETS_TOPIC_SEGMENT_BYTES_CONFIG) - val offsetsTopicCompressionType = Option(getInt(GroupCoordinatorConfig.OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG)).map(value => CompressionType.forId(value)).orNull - /** ********* Transaction management configuration ***********/ val transactionalIdExpirationMs = getInt(TransactionStateManagerConfigs.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG) val transactionMaxTimeoutMs = getInt(TransactionStateManagerConfigs.TRANSACTIONS_MAX_TIMEOUT_MS_CONFIG) @@ -917,7 +886,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami " to prevent unnecessary socket timeouts") require(replicaFetchWaitMaxMs <= replicaLagTimeMaxMs, "replica.fetch.wait.max.ms should always be less than or equal to replica.lag.time.max.ms" + " to prevent frequent changes in ISR") - require(offsetCommitRequiredAcks >= -1 && offsetCommitRequiredAcks <= offsetsTopicReplicationFactor, + require(groupCoordinatorConfig.offsetCommitRequiredAcks >= -1 && groupCoordinatorConfig.offsetCommitRequiredAcks <= groupCoordinatorConfig.offsetsTopicReplicationFactor, "offsets.commit.required.acks must be greater or equal -1 and less or equal to offsets.topic.replication.factor") val advertisedListenerNames = effectiveAdvertisedListeners.map(_.listenerName).toSet @@ -1066,7 +1035,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami s"log.message.format.version $logMessageFormatVersionString can only be used when inter.broker.protocol.version " + s"is set to version ${MetadataVersion.minSupportedFor(recordVersion).shortVersion} or higher") - if (offsetsTopicCompressionType == CompressionType.ZSTD) + if (groupCoordinatorConfig.compressionType == CompressionType.ZSTD) require(interBrokerProtocolVersion.highestSupportedRecordVersion().value >= IBP_2_1_IV0.highestSupportedRecordVersion().value, "offsets.topic.compression.codec zstd can only be used when inter.broker.protocol.version " + s"is set to version ${IBP_2_1_IV0.shortVersion} or higher") @@ -1099,23 +1068,23 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami s"${BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG} must implement KafkaPrincipalSerde") // New group coordinator configs validation. - require(consumerGroupMaxHeartbeatIntervalMs >= consumerGroupMinHeartbeatIntervalMs, + require(groupCoordinatorConfig.consumerGroupMaxHeartbeatIntervalMs >= groupCoordinatorConfig.consumerGroupMinHeartbeatIntervalMs, s"${GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG} must be greater than or equals " + s"to ${GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG}") - require(consumerGroupHeartbeatIntervalMs >= consumerGroupMinHeartbeatIntervalMs, + require(groupCoordinatorConfig.consumerGroupHeartbeatIntervalMs >= groupCoordinatorConfig.consumerGroupMinHeartbeatIntervalMs, s"${GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG} must be greater than or equals " + s"to ${GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG}") - require(consumerGroupHeartbeatIntervalMs <= consumerGroupMaxHeartbeatIntervalMs, + require(groupCoordinatorConfig.consumerGroupHeartbeatIntervalMs <= groupCoordinatorConfig.consumerGroupMaxHeartbeatIntervalMs, s"${GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG} must be less than or equals " + s"to ${GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG}") - require(consumerGroupMaxSessionTimeoutMs >= consumerGroupMinSessionTimeoutMs, + require(groupCoordinatorConfig.consumerGroupMaxSessionTimeoutMs >= groupCoordinatorConfig.consumerGroupMinSessionTimeoutMs, s"${GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG} must be greater than or equals " + s"to ${GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG}") - require(consumerGroupSessionTimeoutMs >= consumerGroupMinSessionTimeoutMs, + require(groupCoordinatorConfig.consumerGroupSessionTimeoutMs >= groupCoordinatorConfig.consumerGroupMinSessionTimeoutMs, s"${GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG} must be greater than or equals " + s"to ${GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG}") - require(consumerGroupSessionTimeoutMs <= consumerGroupMaxSessionTimeoutMs, + require(groupCoordinatorConfig.consumerGroupSessionTimeoutMs <= groupCoordinatorConfig.consumerGroupMaxSessionTimeoutMs, s"${GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG} must be less than or equals " + s"to ${GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG}") diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index be2d81ef17980..3cebfb74f8232 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -501,7 +501,7 @@ class KafkaServer( Time.SYSTEM, metrics ) - groupCoordinator.startup(() => zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.offsetsTopicPartitions)) + groupCoordinator.startup(() => zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.groupCoordinatorConfig.offsetsTopicPartitions)) /* create producer ids manager */ val producerIdManager = if (config.interBrokerProtocolVersion.isAllocateProducerIdsSupported) { diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala index 04a063fd21dd2..680b88335815c 100644 --- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala +++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala @@ -301,7 +301,7 @@ class BrokerMetadataPublisher( try { // Start the group coordinator. groupCoordinator.startup(() => metadataCache.numPartitions(Topic.GROUP_METADATA_TOPIC_NAME) - .getOrElse(config.offsetsTopicPartitions)) + .getOrElse(config.groupCoordinatorConfig.offsetsTopicPartitions)) } catch { case t: Throwable => fatalFaultHandler.handleFault("Error starting GroupCoordinator", t) } diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index 8508607eefde5..4827d1f0a434b 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -163,7 +163,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup TestUtils.createTopicWithAdmin(adminClients.head, topic, servers, controllerServers, numPartitions, replicationFactor = numServers) TestUtils.createTopicWithAdmin(adminClients.head, Topic.GROUP_METADATA_TOPIC_NAME, servers, controllerServers, - numPartitions = servers.head.config.offsetsTopicPartitions, + numPartitions = servers.head.config.groupCoordinatorConfig.offsetsTopicPartitions, replicationFactor = numServers, topicConfig = servers.head.groupCoordinator.groupMetadataTopicConfigs) diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala index 924b8393069a8..6bdf6ade8d47d 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala @@ -84,7 +84,7 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest metrics = new Metrics groupCoordinator = GroupCoordinator(config, replicaManager, heartbeatPurgatory, rebalancePurgatory, timer.time, metrics) - groupCoordinator.startup(() => zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.offsetsTopicPartitions), + groupCoordinator.startup(() => zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.groupCoordinatorConfig.offsetsTopicPartitions), enableMetadataExpiration = false) // Transactional appends attempt to schedule to the request handler thread using @@ -155,7 +155,7 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest groupCoordinator.shutdown() groupCoordinator = GroupCoordinator(config, replicaManager, heartbeatPurgatory, rebalancePurgatory, timer.time, new Metrics()) - groupCoordinator.startup(() => zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.offsetsTopicPartitions), + groupCoordinator.startup(() => zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.groupCoordinatorConfig.offsetsTopicPartitions), enableMetadataExpiration = false) val members = new Group(s"group", nMembersPerGroup, groupCoordinator, replicaManager) diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala index 58c6523db6ad6..de4b7e3e795e1 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala @@ -121,7 +121,7 @@ class GroupCoordinatorTest { val rebalancePurgatory = new DelayedOperationPurgatory[DelayedRebalance]("Rebalance", timer, config.brokerId, reaperEnabled = false) groupCoordinator = GroupCoordinator(config, replicaManager, heartbeatPurgatory, rebalancePurgatory, timer.time, new Metrics()) - groupCoordinator.startup(() => zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.offsetsTopicPartitions), + groupCoordinator.startup(() => zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.groupCoordinatorConfig.offsetsTopicPartitions), enableMetadataExpiration = false) // add the partition into the owned partition list diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala index f184b625bda87..fa78b20e464b2 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala @@ -83,16 +83,16 @@ class GroupMetadataManagerTest { @nowarn("cat=deprecation") private val offsetConfig = { val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(nodeId = 0, zkConnect = "")) - new OffsetConfig(config.offsetMetadataMaxSize, - config.offsetsLoadBufferSize, - config.offsetsRetentionMinutes * 60 * 1000L, - config.offsetsRetentionCheckIntervalMs, - config.offsetsTopicPartitions, - config.offsetsTopicSegmentBytes, - config.offsetsTopicReplicationFactor, - config.offsetsTopicCompressionType, - config.offsetCommitTimeoutMs, - config.offsetCommitRequiredAcks) + new OffsetConfig(config.groupCoordinatorConfig.offsetMetadataMaxSize, + config.groupCoordinatorConfig.offsetsLoadBufferSize, + config.groupCoordinatorConfig.offsetsRetentionMs, + config.groupCoordinatorConfig.offsetsRetentionCheckIntervalMs, + config.groupCoordinatorConfig.offsetsTopicPartitions, + config.groupCoordinatorConfig.offsetsTopicSegmentBytes, + config.groupCoordinatorConfig.offsetsTopicReplicationFactor, + config.groupCoordinatorConfig.compressionType, + config.groupCoordinatorConfig.offsetCommitTimeoutMs, + config.groupCoordinatorConfig.offsetCommitRequiredAcks) } @BeforeEach diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala index d07bcc1885d19..6b4badb388e6a 100755 --- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala +++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala @@ -449,8 +449,8 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness { */ private def createOffsetsTopic(zkClient: KafkaZkClient, servers: Seq[KafkaBroker]): Unit = { val server = servers.head - val numPartitions = server.config.offsetsTopicPartitions - val replicationFactor = server.config.offsetsTopicReplicationFactor.toInt + val numPartitions = server.config.groupCoordinatorConfig.offsetsTopicPartitions + val replicationFactor = server.config.groupCoordinatorConfig.offsetsTopicReplicationFactor.toInt try { TestUtils.createTopic( diff --git a/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala b/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala index d2bb8ea715dd5..9732b0c025dc4 100644 --- a/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala @@ -356,7 +356,7 @@ class AutoTopicCreationManagerTest { val newTopic = if (isInternal) { topicName match { case Topic.GROUP_METADATA_TOPIC_NAME => getNewTopic(topicName, - numPartitions = config.offsetsTopicPartitions, replicationFactor = config.offsetsTopicReplicationFactor) + numPartitions = config.groupCoordinatorConfig.offsetsTopicPartitions, replicationFactor = config.groupCoordinatorConfig.offsetsTopicReplicationFactor) case Topic.TRANSACTION_STATE_TOPIC_NAME => getNewTopic(topicName, numPartitions = config.transactionTopicPartitions, replicationFactor = config.transactionTopicReplicationFactor) } diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 43a7136d4ce24..4a68637cc1c28 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -1227,7 +1227,7 @@ class KafkaConfigTest { assertEquals(11 * 60L * 1000L * 60, config.logRollTimeJitterMillis) assertEquals(10 * 60L * 1000L * 60, config.logRetentionTimeMillis) assertEquals(123L, config.logFlushIntervalMs) - assertEquals(CompressionType.SNAPPY, config.offsetsTopicCompressionType) + assertEquals(CompressionType.SNAPPY, config.groupCoordinatorConfig.compressionType) assertEquals(Sensor.RecordingLevel.DEBUG.toString, config.metricRecordingLevel) assertEquals(false, config.tokenAuthEnabled) assertEquals(7 * 24 * 60L * 60L * 1000L, config.delegationTokenMaxLifeMs) @@ -1916,14 +1916,14 @@ class KafkaConfigTest { ConsumerGroupMigrationPolicy.values.foreach { policy => props.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, policy.toString) val config = KafkaConfig.fromProps(props) - assertEquals(policy, config.consumerGroupMigrationPolicy) + assertEquals(policy, config.groupCoordinatorConfig.consumerGroupMigrationPolicy) } // The config is case-insensitive. ConsumerGroupMigrationPolicy.values.foreach { policy => props.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, policy.toString.toUpperCase()) val config = KafkaConfig.fromProps(props) - assertEquals(policy, config.consumerGroupMigrationPolicy) + assertEquals(policy, config.groupCoordinatorConfig.consumerGroupMigrationPolicy) } } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java index ed378c0c24fa0..0c2aa408875d0 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java @@ -361,4 +361,98 @@ public CompressionType compressionType() { .map(CompressionType::forId) .orElse(null); } + + /** + * Batch size for reading from the offsets segments when loading offsets into + * the cache (soft-limit, overridden if records are too large). + */ + public int offsetsLoadBufferSize() { + return config.getInt(GroupCoordinatorConfig.OFFSETS_LOAD_BUFFER_SIZE_CONFIG); + } + + /** + * The number of partitions for the offset commit topic (should not change after deployment). + */ + public int offsetsTopicPartitions() { + return config.getInt(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG); + } + + /** + * The replication factor for the offsets topic (set higher to ensure availability). + * Internal topic creation will fail until the cluster size meets this replication factor requirement. + */ + public short offsetsTopicReplicationFactor() { + return config.getShort(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG); + } + + /** + * DEPRECATED: The required acks before the commit can be accepted. + * In general, the default (-1) should not be overridden. + */ + @Deprecated // since 3.8 + public short offsetCommitRequiredAcks() { + return config.getShort(GroupCoordinatorConfig.OFFSET_COMMIT_REQUIRED_ACKS_CONFIG); + } + + /** + * The minimum allowed session timeout for registered consumers. + * Shorter timeouts result in quicker failure detection at the cost of more frequent consumer heartbeating, + * which can overwhelm broker resources. + */ + public int groupMinSessionTimeoutMs() { + return config.getInt(GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG); + } + + /** + * The maximum allowed session timeout for registered consumers. + * Longer timeouts give consumers more time to process messages in between heartbeats at the cost of + * a longer time to detect failures. + */ + public int groupMaxSessionTimeoutMs() { + return config.getInt(GroupCoordinatorConfig.GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG); + } + + /** + * The amount of time the group coordinator will wait for more consumers to join a new group before performing + * the first rebalance. + * A longer delay means potentially fewer rebalances, but increases the time until processing begins. + */ + public int groupInitialRebalanceDelay() { + return config.getInt(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG); + } + + /** + * The maximum number of consumers that a single consumer group can accommodate. + */ + public int groupMaxSize() { + return config.getInt(GroupCoordinatorConfig.GROUP_MAX_SIZE_CONFIG); + } + + /** + * The minimum allowed session timeout for registered consumers. + */ + public int consumerGroupMinSessionTimeoutMs() { + return config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG); + } + + /** + * The maximum allowed session timeout for registered consumers. + */ + public int consumerGroupMaxSessionTimeoutMs() { + return config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG); + } + + /** + * The minimum heartbeat interval for registered consumers. + */ + public int consumerGroupMinHeartbeatIntervalMs() { + return config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG); + } + + /** + * The maximum heartbeat interval for registered consumers. + */ + public int consumerGroupMaxHeartbeatIntervalMs() { + return config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG); + } } From 1231164924ba3de6a192ccb124d83cb67d898e16 Mon Sep 17 00:00:00 2001 From: Kuan-Po Tseng Date: Sat, 29 Jun 2024 20:47:31 +0800 Subject: [PATCH 3/8] Rename GroupCoordinatorConfig#compressionType to offsetTopicCompressionType --- .../main/scala/kafka/coordinator/group/GroupCoordinator.scala | 2 +- core/src/main/scala/kafka/server/KafkaConfig.scala | 2 +- .../unit/kafka/coordinator/group/GroupMetadataManagerTest.scala | 2 +- core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala | 2 +- .../apache/kafka/coordinator/group/GroupCoordinatorConfig.java | 2 +- .../apache/kafka/coordinator/group/GroupCoordinatorService.java | 2 +- .../kafka/coordinator/group/GroupCoordinatorConfigTest.java | 2 +- 7 files changed, 7 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala index d6d1b14b8e9dc..8ddec4877d79a 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala @@ -1777,7 +1777,7 @@ object GroupCoordinator { config.groupCoordinatorConfig.offsetsTopicPartitions, config.groupCoordinatorConfig.offsetsTopicSegmentBytes, config.groupCoordinatorConfig.offsetsTopicReplicationFactor, - config.groupCoordinatorConfig.compressionType, + config.groupCoordinatorConfig.offsetTopicCompressionType, config.groupCoordinatorConfig.offsetCommitTimeoutMs, config.groupCoordinatorConfig.offsetCommitRequiredAcks ) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 1402e41ffee23..9061e0fb9bf9c 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -1035,7 +1035,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami s"log.message.format.version $logMessageFormatVersionString can only be used when inter.broker.protocol.version " + s"is set to version ${MetadataVersion.minSupportedFor(recordVersion).shortVersion} or higher") - if (groupCoordinatorConfig.compressionType == CompressionType.ZSTD) + if (groupCoordinatorConfig.offsetTopicCompressionType == CompressionType.ZSTD) require(interBrokerProtocolVersion.highestSupportedRecordVersion().value >= IBP_2_1_IV0.highestSupportedRecordVersion().value, "offsets.topic.compression.codec zstd can only be used when inter.broker.protocol.version " + s"is set to version ${IBP_2_1_IV0.shortVersion} or higher") diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala index fa78b20e464b2..97c7c2ee747da 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala @@ -90,7 +90,7 @@ class GroupMetadataManagerTest { config.groupCoordinatorConfig.offsetsTopicPartitions, config.groupCoordinatorConfig.offsetsTopicSegmentBytes, config.groupCoordinatorConfig.offsetsTopicReplicationFactor, - config.groupCoordinatorConfig.compressionType, + config.groupCoordinatorConfig.offsetTopicCompressionType, config.groupCoordinatorConfig.offsetCommitTimeoutMs, config.groupCoordinatorConfig.offsetCommitRequiredAcks) } diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 4a68637cc1c28..c8a6a5a773006 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -1227,7 +1227,7 @@ class KafkaConfigTest { assertEquals(11 * 60L * 1000L * 60, config.logRollTimeJitterMillis) assertEquals(10 * 60L * 1000L * 60, config.logRetentionTimeMillis) assertEquals(123L, config.logFlushIntervalMs) - assertEquals(CompressionType.SNAPPY, config.groupCoordinatorConfig.compressionType) + assertEquals(CompressionType.SNAPPY, config.groupCoordinatorConfig.offsetTopicCompressionType) assertEquals(Sensor.RecordingLevel.DEBUG.toString, config.metricRecordingLevel) assertEquals(false, config.tokenAuthEnabled) assertEquals(7 * 24 * 60L * 60L * 1000L, config.delegationTokenMaxLifeMs) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java index 0c2aa408875d0..2abd880a72c6f 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java @@ -356,7 +356,7 @@ public ConsumerGroupMigrationPolicy consumerGroupMigrationPolicy() { /** * The compression type used to compress records in batches. */ - public CompressionType compressionType() { + public CompressionType offsetTopicCompressionType() { return Optional.ofNullable(config.getInt(GroupCoordinatorConfig.OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG)) .map(CompressionType::forId) .orElse(null); diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java index 24d75e320fd11..8d4845b23b6d0 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java @@ -187,7 +187,7 @@ public GroupCoordinatorService build() { .withCoordinatorRuntimeMetrics(coordinatorRuntimeMetrics) .withCoordinatorMetrics(groupCoordinatorMetrics) .withSerializer(new CoordinatorRecordSerde()) - .withCompression(Compression.of(config.compressionType()).build()) + .withCompression(Compression.of(config.offsetTopicCompressionType()).build()) .withAppendLingerMs(config.appendLingerMs()) .build(); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java index 65cf735eaaeca..ccc7838c8702a 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java @@ -71,7 +71,7 @@ public void testConfigs() { assertEquals(10 * 60 * 1000, config.offsetsRetentionCheckIntervalMs()); assertEquals(Duration.ofMinutes(24 * 60 * 60 * 1000L).toMillis(), config.offsetsRetentionMs()); assertEquals(5000, config.offsetCommitTimeoutMs()); - assertEquals(CompressionType.GZIP, config.compressionType()); + assertEquals(CompressionType.GZIP, config.offsetTopicCompressionType()); assertEquals(10, config.appendLingerMs()); } From ffee5fc5c044e87973dc309045e0342c46a6814f Mon Sep 17 00:00:00 2001 From: Kuan-Po Tseng Date: Sat, 29 Jun 2024 21:56:07 +0800 Subject: [PATCH 4/8] Remove duplicate configs in GroupCoordinatorConfig --- .../coordinator/group/GroupCoordinator.scala | 8 ++--- .../group/GroupCoordinatorConfig.java | 34 ------------------- 2 files changed, 4 insertions(+), 38 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala index 8ddec4877d79a..58a37e3333b5d 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala @@ -1791,10 +1791,10 @@ object GroupCoordinator { metrics: Metrics ): GroupCoordinator = { val offsetConfig = this.offsetConfig(config) - val groupConfig = GroupConfig(groupMinSessionTimeoutMs = config.groupCoordinatorConfig.groupMinSessionTimeoutMs, - groupMaxSessionTimeoutMs = config.groupCoordinatorConfig.groupMaxSessionTimeoutMs, - groupMaxSize = config.groupCoordinatorConfig.groupMaxSize, - groupInitialRebalanceDelayMs = config.groupCoordinatorConfig.groupInitialRebalanceDelay) + val groupConfig = GroupConfig(groupMinSessionTimeoutMs = config.groupCoordinatorConfig.classicGroupMinSessionTimeoutMs, + groupMaxSessionTimeoutMs = config.groupCoordinatorConfig.classicGroupMaxSessionTimeoutMs, + groupMaxSize = config.groupCoordinatorConfig.classicGroupMaxSize, + groupInitialRebalanceDelayMs = config.groupCoordinatorConfig.classicGroupInitialRebalanceDelayMs) val groupMetadataManager = new GroupMetadataManager(config.brokerId, config.interBrokerProtocolVersion, offsetConfig, replicaManager, time, metrics) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java index 2abd880a72c6f..4740b4e308bdb 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java @@ -394,40 +394,6 @@ public short offsetCommitRequiredAcks() { return config.getShort(GroupCoordinatorConfig.OFFSET_COMMIT_REQUIRED_ACKS_CONFIG); } - /** - * The minimum allowed session timeout for registered consumers. - * Shorter timeouts result in quicker failure detection at the cost of more frequent consumer heartbeating, - * which can overwhelm broker resources. - */ - public int groupMinSessionTimeoutMs() { - return config.getInt(GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG); - } - - /** - * The maximum allowed session timeout for registered consumers. - * Longer timeouts give consumers more time to process messages in between heartbeats at the cost of - * a longer time to detect failures. - */ - public int groupMaxSessionTimeoutMs() { - return config.getInt(GroupCoordinatorConfig.GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG); - } - - /** - * The amount of time the group coordinator will wait for more consumers to join a new group before performing - * the first rebalance. - * A longer delay means potentially fewer rebalances, but increases the time until processing begins. - */ - public int groupInitialRebalanceDelay() { - return config.getInt(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG); - } - - /** - * The maximum number of consumers that a single consumer group can accommodate. - */ - public int groupMaxSize() { - return config.getInt(GroupCoordinatorConfig.GROUP_MAX_SIZE_CONFIG); - } - /** * The minimum allowed session timeout for registered consumers. */ From 524bd558b45c24893ed8a89c68cb59ce86468f0d Mon Sep 17 00:00:00 2001 From: Kuan-Po Tseng Date: Sat, 29 Jun 2024 22:03:21 +0800 Subject: [PATCH 5/8] Add more unit tests --- .../group/GroupCoordinatorConfigTest.java | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java index ccc7838c8702a..2a74c84133265 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java @@ -31,6 +31,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; +@SuppressWarnings("deprecation") public class GroupCoordinatorConfigTest { @Test public void testConfigs() { @@ -52,6 +53,14 @@ public void testConfigs() { configs.put(GroupCoordinatorConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG, 5000); configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, ConsumerGroupMigrationPolicy.DISABLED.name()); configs.put(GroupCoordinatorConfig.OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG, (int) CompressionType.GZIP.id); + configs.put(GroupCoordinatorConfig.OFFSETS_LOAD_BUFFER_SIZE_CONFIG, 555); + configs.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, 111); + configs.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, (short) 11); + configs.put(GroupCoordinatorConfig.OFFSET_COMMIT_REQUIRED_ACKS_CONFIG, (short) 0); + configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, 333); + configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, 666); + configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, 111); + configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, 222); GroupCoordinatorConfig config = new GroupCoordinatorConfig(new GroupCoordinatorTestConfig(configs)); @@ -73,6 +82,14 @@ public void testConfigs() { assertEquals(5000, config.offsetCommitTimeoutMs()); assertEquals(CompressionType.GZIP, config.offsetTopicCompressionType()); assertEquals(10, config.appendLingerMs()); + assertEquals(555, config.offsetsLoadBufferSize()); + assertEquals(111, config.offsetsTopicPartitions()); + assertEquals(11, config.offsetsTopicReplicationFactor()); + assertEquals(0, config.offsetCommitRequiredAcks()); + assertEquals(333, config.consumerGroupMinSessionTimeoutMs()); + assertEquals(666, config.consumerGroupMaxSessionTimeoutMs()); + assertEquals(111, config.consumerGroupMinHeartbeatIntervalMs()); + assertEquals(222, config.consumerGroupMaxHeartbeatIntervalMs()); } public static GroupCoordinatorConfig createGroupCoordinatorConfig( From 4c21247db11b42a12b91184eff43e0fe0d883e5e Mon Sep 17 00:00:00 2001 From: Kuan-Po Tseng Date: Sat, 29 Jun 2024 22:08:13 +0800 Subject: [PATCH 6/8] Address comments --- core/src/main/scala/kafka/server/BrokerServer.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 6b0debeb1b9c0..a11231d9e3002 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -568,7 +568,6 @@ class BrokerServer( if (config.isNewGroupCoordinatorEnabled) { val time = Time.SYSTEM val serde = new CoordinatorRecordSerde - val groupCoordinatorConfig = new GroupCoordinatorConfig(config) val timer = new SystemTimerReaper( "group-coordinator-reaper", new SystemTimer("group-coordinator") @@ -582,7 +581,7 @@ class BrokerServer( val writer = new CoordinatorPartitionWriter( replicaManager ) - new GroupCoordinatorService.Builder(config.brokerId, groupCoordinatorConfig) + new GroupCoordinatorService.Builder(config.brokerId, config.groupCoordinatorConfig) .withTime(time) .withTimer(timer) .withLoader(loader) From bad7334326baa614e2ddc04725d204875e7dab08 Mon Sep 17 00:00:00 2001 From: Kuan-Po Tseng Date: Sat, 29 Jun 2024 22:39:10 +0800 Subject: [PATCH 7/8] Remove unsued import --- core/src/main/scala/kafka/server/BrokerServer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index a11231d9e3002..074c9b12f4ccf 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -36,7 +36,7 @@ import org.apache.kafka.common.security.token.delegation.internals.DelegationTok import org.apache.kafka.common.utils.{LogContext, Time} import org.apache.kafka.common.{ClusterResource, TopicPartition, Uuid} import org.apache.kafka.coordinator.group.metrics.{GroupCoordinatorMetrics, GroupCoordinatorRuntimeMetrics} -import org.apache.kafka.coordinator.group.{CoordinatorRecord, GroupCoordinator, GroupCoordinatorConfig, GroupCoordinatorService, CoordinatorRecordSerde} +import org.apache.kafka.coordinator.group.{CoordinatorRecord, GroupCoordinator, GroupCoordinatorService, CoordinatorRecordSerde} import org.apache.kafka.image.publisher.{BrokerRegistrationTracker, MetadataPublisher} import org.apache.kafka.metadata.{BrokerState, ListenerInfo, VersionRange} import org.apache.kafka.security.CredentialProvider From 9e75d887af16f52e36476541dcb55289c65c8005 Mon Sep 17 00:00:00 2001 From: Kuan-Po Tseng Date: Sun, 30 Jun 2024 23:21:28 +0800 Subject: [PATCH 8/8] Address comment --- .../group/GroupCoordinatorConfigTest.java | 28 ++++++++----------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java index 2a74c84133265..ba3b357ae67ec 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.coordinator.group; import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.coordinator.group.assignor.RangeAssignor; @@ -27,12 +28,19 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import static org.junit.jupiter.api.Assertions.assertEquals; @SuppressWarnings("deprecation") public class GroupCoordinatorConfigTest { + private static final List GROUP_COORDINATOR_CONFIG_DEFS = Arrays.asList( + GroupCoordinatorConfig.GROUP_COORDINATOR_CONFIG_DEF, + GroupCoordinatorConfig.NEW_GROUP_CONFIG_DEF, + GroupCoordinatorConfig.OFFSET_MANAGEMENT_CONFIG_DEF, + GroupCoordinatorConfig.CONSUMER_GROUP_CONFIG_DEF); + @Test public void testConfigs() { Map configs = new HashMap<>(); @@ -62,7 +70,8 @@ public void testConfigs() { configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, 111); configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, 222); - GroupCoordinatorConfig config = new GroupCoordinatorConfig(new GroupCoordinatorTestConfig(configs)); + GroupCoordinatorConfig config = new GroupCoordinatorConfig( + new AbstractConfig(Utils.mergeConfigs(GROUP_COORDINATOR_CONFIG_DEFS), configs, false)); assertEquals(10, config.numThreads()); assertEquals(30, config.consumerGroupSessionTimeoutMs()); @@ -116,20 +125,7 @@ public static GroupCoordinatorConfig createGroupCoordinatorConfig( configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, ConsumerGroupMigrationPolicy.DISABLED.name()); configs.put(GroupCoordinatorConfig.OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG, (int) CompressionType.NONE.id); - return new GroupCoordinatorConfig(new GroupCoordinatorTestConfig(configs)); - } - - private static class GroupCoordinatorTestConfig extends AbstractConfig { - - public GroupCoordinatorTestConfig(Map originals) { - super( - Utils.mergeConfigs(Arrays.asList( - GroupCoordinatorConfig.GROUP_COORDINATOR_CONFIG_DEF, - GroupCoordinatorConfig.NEW_GROUP_CONFIG_DEF, - GroupCoordinatorConfig.OFFSET_MANAGEMENT_CONFIG_DEF, - GroupCoordinatorConfig.CONSUMER_GROUP_CONFIG_DEF)), - originals, - true); - } + return new GroupCoordinatorConfig( + new AbstractConfig(Utils.mergeConfigs(GROUP_COORDINATOR_CONFIG_DEFS), configs, false)); } }