From 772b516a0651e14a092b54f41ca71b3986bce8cf Mon Sep 17 00:00:00 2001 From: Kuan-Po Tseng Date: Thu, 4 Jul 2024 23:43:16 +0800 Subject: [PATCH 1/3] KAFKA-17081 Tweak GroupCoordinatorConfig: re-introduce local attributes and validation --- .../org/apache/kafka/common/utils/Utils.java | 5 + .../main/scala/kafka/server/KafkaConfig.scala | 24 +--- .../group/GroupCoordinatorConfig.java | 127 +++++++++++++----- .../group/GroupCoordinatorConfigTest.java | 10 +- 4 files changed, 109 insertions(+), 57 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index 79f41c732ab0d..85a910d24bccd 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -1656,6 +1656,11 @@ public static void require(boolean requirement) { throw new IllegalArgumentException("requirement failed"); } + public static void require(boolean requirement, String errorMessage) { + if (!requirement) + throw new IllegalArgumentException(errorMessage); + } + /** * Merge multiple {@link ConfigDef} into one * @param configDefs List of {@link ConfigDef} diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 1f37b0b1ba74f..7ead200679ab3 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -892,8 +892,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami " to prevent unnecessary socket timeouts") require(replicaFetchWaitMaxMs <= replicaLagTimeMaxMs, "replica.fetch.wait.max.ms should always be less than or equal to replica.lag.time.max.ms" + " to prevent frequent changes in ISR") - require(groupCoordinatorConfig.offsetCommitRequiredAcks >= -1 && groupCoordinatorConfig.offsetCommitRequiredAcks <= groupCoordinatorConfig.offsetsTopicReplicationFactor, - "offsets.commit.required.acks must be greater or equal -1 and less or equal to offsets.topic.replication.factor") + val advertisedBrokerListenerNames = effectiveAdvertisedBrokerListeners.map(_.listenerName).toSet // validate KRaft-related configs @@ -1077,27 +1076,6 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami require(classOf[KafkaPrincipalSerde].isAssignableFrom(principalBuilderClass), s"${BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG} must implement KafkaPrincipalSerde") - // New group coordinator configs validation. - require(groupCoordinatorConfig.consumerGroupMaxHeartbeatIntervalMs >= groupCoordinatorConfig.consumerGroupMinHeartbeatIntervalMs, - s"${GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG} must be greater than or equals " + - s"to ${GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG}") - require(groupCoordinatorConfig.consumerGroupHeartbeatIntervalMs >= groupCoordinatorConfig.consumerGroupMinHeartbeatIntervalMs, - s"${GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG} must be greater than or equals " + - s"to ${GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG}") - require(groupCoordinatorConfig.consumerGroupHeartbeatIntervalMs <= groupCoordinatorConfig.consumerGroupMaxHeartbeatIntervalMs, - s"${GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG} must be less than or equals " + - s"to ${GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG}") - - require(groupCoordinatorConfig.consumerGroupMaxSessionTimeoutMs >= groupCoordinatorConfig.consumerGroupMinSessionTimeoutMs, - s"${GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG} must be greater than or equals " + - s"to ${GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG}") - require(groupCoordinatorConfig.consumerGroupSessionTimeoutMs >= groupCoordinatorConfig.consumerGroupMinSessionTimeoutMs, - s"${GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG} must be greater than or equals " + - s"to ${GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG}") - require(groupCoordinatorConfig.consumerGroupSessionTimeoutMs <= groupCoordinatorConfig.consumerGroupMaxSessionTimeoutMs, - s"${GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG} must be less than or equals " + - s"to ${GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG}") - require(shareGroupMaxHeartbeatIntervalMs >= shareGroupMinHeartbeatIntervalMs, s"${ShareGroupConfigs.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG} must be greater than or equals " + s"to ${ShareGroupConfigs.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG}") diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java index 4740b4e308bdb..2b1be1d8c473a 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java @@ -39,6 +39,7 @@ import static org.apache.kafka.common.config.ConfigDef.Type.LONG; import static org.apache.kafka.common.config.ConfigDef.Type.SHORT; import static org.apache.kafka.common.config.ConfigDef.Type.STRING; +import static org.apache.kafka.common.utils.Utils.require; /** * The group coordinator configurations. @@ -213,17 +214,86 @@ public class GroupCoordinatorConfig { */ public static final int CLASSIC_GROUP_NEW_MEMBER_JOIN_TIMEOUT_MS = 5 * 60 * 1000; - private final AbstractConfig config; + private final int numThreads; + private final int appendLingerMs; + private final int consumerGroupSessionTimeoutMs; + private final int consumerGroupHeartbeatIntervalMs; + private final int consumerGroupMaxSize; + private final List consumerGroupAssignors; + private final int offsetsTopicSegmentBytes; + private final int offsetMetadataMaxSize; + private final int classicGroupMaxSize; + private final int classicGroupInitialRebalanceDelayMs; + private final int classicGroupMinSessionTimeoutMs; + private final int classicGroupMaxSessionTimeoutMs; + private final long offsetsRetentionCheckIntervalMs; + private final long offsetsRetentionMs; + private final int offsetCommitTimeoutMs; + private final ConsumerGroupMigrationPolicy consumerGroupMigrationPolicy; + private final CompressionType offsetTopicCompressionType; + private final int offsetsLoadBufferSize; + private final int offsetsTopicPartitions; + private final short offsetsTopicReplicationFactor; + private final short offsetCommitRequiredAcks; + private final int consumerGroupMinSessionTimeoutMs; + private final int consumerGroupMaxSessionTimeoutMs; + private final int consumerGroupMinHeartbeatIntervalMs; + private final int consumerGroupMaxHeartbeatIntervalMs; public GroupCoordinatorConfig(AbstractConfig config) { - this.config = config; + this.numThreads = config.getInt(GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG); + this.appendLingerMs = config.getInt(GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG); + this.consumerGroupSessionTimeoutMs = config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG); + this.consumerGroupHeartbeatIntervalMs = config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG); + this.consumerGroupMaxSize = config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SIZE_CONFIG); + this.consumerGroupAssignors = config.getConfiguredInstances(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, ConsumerGroupPartitionAssignor.class); + this.offsetsTopicSegmentBytes = config.getInt(GroupCoordinatorConfig.OFFSETS_TOPIC_SEGMENT_BYTES_CONFIG); + this.offsetMetadataMaxSize = config.getInt(GroupCoordinatorConfig.OFFSET_METADATA_MAX_SIZE_CONFIG); + this.classicGroupMaxSize = config.getInt(GroupCoordinatorConfig.GROUP_MAX_SIZE_CONFIG); + this.classicGroupInitialRebalanceDelayMs = config.getInt(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG); + this.classicGroupMinSessionTimeoutMs = config.getInt(GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG); + this.classicGroupMaxSessionTimeoutMs = config.getInt(GroupCoordinatorConfig.GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG); + this.offsetsRetentionCheckIntervalMs = config.getLong(GroupCoordinatorConfig.OFFSETS_RETENTION_CHECK_INTERVAL_MS_CONFIG); + this.offsetsRetentionMs = config.getInt(GroupCoordinatorConfig.OFFSETS_RETENTION_MINUTES_CONFIG) * 60L * 1000L; + this.offsetCommitTimeoutMs = config.getInt(GroupCoordinatorConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG); + this.consumerGroupMigrationPolicy = ConsumerGroupMigrationPolicy.parse( + config.getString(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG)); + this.offsetTopicCompressionType = Optional.ofNullable(config.getInt(GroupCoordinatorConfig.OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG)) + .map(CompressionType::forId) + .orElse(null); + this.offsetsLoadBufferSize = config.getInt(GroupCoordinatorConfig.OFFSETS_LOAD_BUFFER_SIZE_CONFIG); + this.offsetsTopicPartitions = config.getInt(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG); + this.offsetsTopicReplicationFactor = config.getShort(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG); + this.offsetCommitRequiredAcks = config.getShort(GroupCoordinatorConfig.OFFSET_COMMIT_REQUIRED_ACKS_CONFIG); + this.consumerGroupMinSessionTimeoutMs = config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG); + this.consumerGroupMaxSessionTimeoutMs = config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG); + this.consumerGroupMinHeartbeatIntervalMs = config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG); + this.consumerGroupMaxHeartbeatIntervalMs = config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG); + + require(offsetCommitRequiredAcks >= -1 && offsetCommitRequiredAcks <= offsetsTopicReplicationFactor, + String.format("%s must be greater or equal to -1 and less or equal to %s", OFFSET_COMMIT_REQUIRED_ACKS_CONFIG, OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG)); + + // New group coordinator configs validation. + require(consumerGroupMaxHeartbeatIntervalMs >= consumerGroupMinHeartbeatIntervalMs, + String.format("%s must be greater than or equals to %s", CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG)); + require(consumerGroupHeartbeatIntervalMs >= consumerGroupMinHeartbeatIntervalMs, + String.format("%s must be greater than or equals to %s", CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG)); + require(consumerGroupHeartbeatIntervalMs <= consumerGroupMaxHeartbeatIntervalMs, + String.format("%s must be less than or equals to %s", CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG)); + + require(consumerGroupMaxSessionTimeoutMs >= consumerGroupMinSessionTimeoutMs, + String.format("%s must be greater than or equals to %s", CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG)); + require(consumerGroupSessionTimeoutMs >= consumerGroupMinSessionTimeoutMs, + String.format("%s must be greater than or equals to %s", CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG)); + require(consumerGroupSessionTimeoutMs <= consumerGroupMaxSessionTimeoutMs, + String.format("%s must be less than or equals to %s", CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG)); } /** * The number of threads or event loops running. */ public int numThreads() { - return config.getInt(GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG); + return numThreads; } /** @@ -231,35 +301,35 @@ public int numThreads() { * accumulate before flushing them to disk. */ public int appendLingerMs() { - return config.getInt(GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG); + return appendLingerMs; } /** * The consumer group session timeout in milliseconds. */ public int consumerGroupSessionTimeoutMs() { - return config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG); + return consumerGroupSessionTimeoutMs; } /** * The consumer group heartbeat interval in milliseconds. */ public int consumerGroupHeartbeatIntervalMs() { - return config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG); + return consumerGroupHeartbeatIntervalMs; } /** * The consumer group maximum size. */ public int consumerGroupMaxSize() { - return config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SIZE_CONFIG); + return consumerGroupMaxSize; } /** * The consumer group assignors. */ public List consumerGroupAssignors() { - return config.getConfiguredInstances(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, ConsumerGroupPartitionAssignor.class); + return consumerGroupAssignors; } /** @@ -267,28 +337,28 @@ public List consumerGroupAssignors() { * log compaction and faster offset loads. */ public int offsetsTopicSegmentBytes() { - return config.getInt(GroupCoordinatorConfig.OFFSETS_TOPIC_SEGMENT_BYTES_CONFIG); + return offsetsTopicSegmentBytes; } /** * The maximum size for a metadata entry associated with an offset commit. */ public int offsetMetadataMaxSize() { - return config.getInt(GroupCoordinatorConfig.OFFSET_METADATA_MAX_SIZE_CONFIG); + return offsetMetadataMaxSize; } /** * The classic group maximum size. */ public int classicGroupMaxSize() { - return config.getInt(GroupCoordinatorConfig.GROUP_MAX_SIZE_CONFIG); + return classicGroupMaxSize; } /** * The delay in milliseconds introduced for the first rebalance of a classic group. */ public int classicGroupInitialRebalanceDelayMs() { - return config.getInt(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG); + return classicGroupInitialRebalanceDelayMs; } /** @@ -302,21 +372,21 @@ public int classicGroupNewMemberJoinTimeoutMs() { * The classic group minimum session timeout. */ public int classicGroupMinSessionTimeoutMs() { - return config.getInt(GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG); + return classicGroupMinSessionTimeoutMs; } /** * The classic group maximum session timeout. */ public int classicGroupMaxSessionTimeoutMs() { - return config.getInt(GroupCoordinatorConfig.GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG); + return classicGroupMaxSessionTimeoutMs; } /** * Frequency at which to check for expired offsets. */ public long offsetsRetentionCheckIntervalMs() { - return config.getLong(GroupCoordinatorConfig.OFFSETS_RETENTION_CHECK_INTERVAL_MS_CONFIG); + return offsetsRetentionCheckIntervalMs; } /** @@ -334,7 +404,7 @@ public long offsetsRetentionCheckIntervalMs() { * committed offsets for that topic will also be deleted without extra retention period. */ public long offsetsRetentionMs() { - return config.getInt(GroupCoordinatorConfig.OFFSETS_RETENTION_MINUTES_CONFIG) * 60L * 1000L; + return offsetsRetentionMs; } /** @@ -342,24 +412,21 @@ public long offsetsRetentionMs() { * or this timeout is reached */ public int offsetCommitTimeoutMs() { - return config.getInt(GroupCoordinatorConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG); + return offsetCommitTimeoutMs; } /** * The config indicating whether group protocol upgrade/downgrade are allowed. */ public ConsumerGroupMigrationPolicy consumerGroupMigrationPolicy() { - return ConsumerGroupMigrationPolicy.parse( - config.getString(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG)); + return consumerGroupMigrationPolicy; } /** * The compression type used to compress records in batches. */ public CompressionType offsetTopicCompressionType() { - return Optional.ofNullable(config.getInt(GroupCoordinatorConfig.OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG)) - .map(CompressionType::forId) - .orElse(null); + return offsetTopicCompressionType; } /** @@ -367,14 +434,14 @@ public CompressionType offsetTopicCompressionType() { * the cache (soft-limit, overridden if records are too large). */ public int offsetsLoadBufferSize() { - return config.getInt(GroupCoordinatorConfig.OFFSETS_LOAD_BUFFER_SIZE_CONFIG); + return offsetsLoadBufferSize; } /** * The number of partitions for the offset commit topic (should not change after deployment). */ public int offsetsTopicPartitions() { - return config.getInt(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG); + return offsetsTopicPartitions; } /** @@ -382,7 +449,7 @@ public int offsetsTopicPartitions() { * Internal topic creation will fail until the cluster size meets this replication factor requirement. */ public short offsetsTopicReplicationFactor() { - return config.getShort(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG); + return offsetsTopicReplicationFactor; } /** @@ -391,34 +458,34 @@ public short offsetsTopicReplicationFactor() { */ @Deprecated // since 3.8 public short offsetCommitRequiredAcks() { - return config.getShort(GroupCoordinatorConfig.OFFSET_COMMIT_REQUIRED_ACKS_CONFIG); + return offsetCommitRequiredAcks; } /** * The minimum allowed session timeout for registered consumers. */ public int consumerGroupMinSessionTimeoutMs() { - return config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG); + return consumerGroupMinSessionTimeoutMs; } /** * The maximum allowed session timeout for registered consumers. */ public int consumerGroupMaxSessionTimeoutMs() { - return config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG); + return consumerGroupMaxSessionTimeoutMs; } /** * The minimum heartbeat interval for registered consumers. */ public int consumerGroupMinHeartbeatIntervalMs() { - return config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG); + return consumerGroupMinHeartbeatIntervalMs; } /** * The maximum heartbeat interval for registered consumers. */ public int consumerGroupMaxHeartbeatIntervalMs() { - return config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG); + return consumerGroupMaxHeartbeatIntervalMs; } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java index ba3b357ae67ec..69cf1164f127a 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java @@ -46,8 +46,8 @@ public void testConfigs() { Map configs = new HashMap<>(); configs.put(GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG, 10); configs.put(GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG, 10); - configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, 30); - configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 10); + configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, 555); + configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 200); configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SIZE_CONFIG, 55); configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, Collections.singletonList(RangeAssignor.class)); configs.put(GroupCoordinatorConfig.OFFSETS_TOPIC_SEGMENT_BYTES_CONFIG, 2222); @@ -74,8 +74,8 @@ public void testConfigs() { new AbstractConfig(Utils.mergeConfigs(GROUP_COORDINATOR_CONFIG_DEFS), configs, false)); assertEquals(10, config.numThreads()); - assertEquals(30, config.consumerGroupSessionTimeoutMs()); - assertEquals(10, config.consumerGroupHeartbeatIntervalMs()); + assertEquals(555, config.consumerGroupSessionTimeoutMs()); + assertEquals(200, config.consumerGroupHeartbeatIntervalMs()); assertEquals(55, config.consumerGroupMaxSize()); assertEquals(1, config.consumerGroupAssignors().size()); assertEquals(RangeAssignor.RANGE_ASSIGNOR_NAME, config.consumerGroupAssignors().get(0).name()); @@ -110,7 +110,9 @@ public static GroupCoordinatorConfig createGroupCoordinatorConfig( configs.put(GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG, 1); configs.put(GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG, 10); configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, 45); + configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, 45); configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 5); + configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, 5); configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SIZE_CONFIG, Integer.MAX_VALUE); configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, Collections.singletonList(RangeAssignor.class)); configs.put(GroupCoordinatorConfig.OFFSETS_TOPIC_SEGMENT_BYTES_CONFIG, 1000); From 8c400cc9038a5c9863c79fb7d9271539c5476fcd Mon Sep 17 00:00:00 2001 From: Kuan-Po Tseng Date: Fri, 5 Jul 2024 23:46:35 +0800 Subject: [PATCH 2/3] Address comments --- .../group/GroupCoordinatorConfig.java | 3 + .../group/GroupCoordinatorConfigTest.java | 59 ++++++++++++++++++- 2 files changed, 60 insertions(+), 2 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java index 2b1be1d8c473a..9316b4a05ccc8 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java @@ -43,6 +43,9 @@ /** * The group coordinator configurations. + * This configuration utilizes several local variables instead of calling AbstractConfig#get.... as all configs here + * are static and non-dynamic, with some being accessed extremely frequently (e.g., offsets.commit.timeout.ms). + * Using local variable is advantageous as it avoids the overhead of repeatedly looking up these configurations in AbstractConfig. */ public class GroupCoordinatorConfig { /** ********* Group coordinator configuration ***********/ diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java index 69cf1164f127a..44f8e0ce3dc3e 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java @@ -32,6 +32,7 @@ import java.util.Map; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; @SuppressWarnings("deprecation") public class GroupCoordinatorConfigTest { @@ -70,8 +71,7 @@ public void testConfigs() { configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, 111); configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, 222); - GroupCoordinatorConfig config = new GroupCoordinatorConfig( - new AbstractConfig(Utils.mergeConfigs(GROUP_COORDINATOR_CONFIG_DEFS), configs, false)); + GroupCoordinatorConfig config = createConfig(configs); assertEquals(10, config.numThreads()); assertEquals(555, config.consumerGroupSessionTimeoutMs()); @@ -101,6 +101,57 @@ public void testConfigs() { assertEquals(222, config.consumerGroupMaxHeartbeatIntervalMs()); } + @Test + public void testInvalidConfigs() { + Map configs = new HashMap<>(); + configs.put(GroupCoordinatorConfig.OFFSET_COMMIT_REQUIRED_ACKS_CONFIG, (short) -2); + configs.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, (short) 3); + assertEquals("offsets.commit.required.acks must be greater or equal to -1 and less or equal to offsets.topic.replication.factor", + assertThrows(IllegalArgumentException.class, () -> createConfig(configs)).getMessage()); + + configs.clear(); + configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, 10); + configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, 20); + configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 20); + assertEquals("group.consumer.max.heartbeat.interval.ms must be greater than or equals to group.consumer.min.heartbeat.interval.ms", + assertThrows(IllegalArgumentException.class, () -> createConfig(configs)).getMessage()); + + configs.clear(); + configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, 30); + configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, 20); + configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 10); + assertEquals("group.consumer.heartbeat.interval.ms must be greater than or equals to group.consumer.min.heartbeat.interval.ms", + assertThrows(IllegalArgumentException.class, () -> createConfig(configs)).getMessage()); + + configs.clear(); + configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, 30); + configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, 20); + configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 40); + assertEquals("group.consumer.heartbeat.interval.ms must be less than or equals to group.consumer.max.heartbeat.interval.ms", + assertThrows(IllegalArgumentException.class, () -> createConfig(configs)).getMessage()); + + configs.clear(); + configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, 10); + configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, 20); + configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, 20); + assertEquals("group.consumer.max.session.timeout.ms must be greater than or equals to group.consumer.min.session.timeout.ms", + assertThrows(IllegalArgumentException.class, () -> createConfig(configs)).getMessage()); + + configs.clear(); + configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, 30); + configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, 20); + configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, 10); + assertEquals("group.consumer.session.timeout.ms must be greater than or equals to group.consumer.min.session.timeout.ms", + assertThrows(IllegalArgumentException.class, () -> createConfig(configs)).getMessage()); + + configs.clear(); + configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, 30); + configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, 20); + configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, 40); + assertEquals("group.consumer.session.timeout.ms must be less than or equals to group.consumer.max.session.timeout.ms", + assertThrows(IllegalArgumentException.class, () -> createConfig(configs)).getMessage()); + } + public static GroupCoordinatorConfig createGroupCoordinatorConfig( int offsetMetadataMaxSize, long offsetsRetentionCheckIntervalMs, @@ -127,6 +178,10 @@ public static GroupCoordinatorConfig createGroupCoordinatorConfig( configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, ConsumerGroupMigrationPolicy.DISABLED.name()); configs.put(GroupCoordinatorConfig.OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG, (int) CompressionType.NONE.id); + return createConfig(configs); + } + + private static GroupCoordinatorConfig createConfig(Map configs) { return new GroupCoordinatorConfig( new AbstractConfig(Utils.mergeConfigs(GROUP_COORDINATOR_CONFIG_DEFS), configs, false)); } From 9b5aa83f0a943012e39e9c093c2c4a7a594eefae Mon Sep 17 00:00:00 2001 From: Kuan-Po Tseng Date: Sat, 6 Jul 2024 11:52:39 +0800 Subject: [PATCH 3/3] Address comments --- .../group/GroupCoordinatorConfig.java | 3 ++- .../group/GroupCoordinatorConfigTest.java | 22 +++++++++++++++++++ 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java index 9316b4a05ccc8..ccf367194a054 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java @@ -249,7 +249,8 @@ public GroupCoordinatorConfig(AbstractConfig config) { this.consumerGroupSessionTimeoutMs = config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG); this.consumerGroupHeartbeatIntervalMs = config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG); this.consumerGroupMaxSize = config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SIZE_CONFIG); - this.consumerGroupAssignors = config.getConfiguredInstances(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, ConsumerGroupPartitionAssignor.class); + this.consumerGroupAssignors = Collections.unmodifiableList( + config.getConfiguredInstances(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, ConsumerGroupPartitionAssignor.class)); this.offsetsTopicSegmentBytes = config.getInt(GroupCoordinatorConfig.OFFSETS_TOPIC_SEGMENT_BYTES_CONFIG); this.offsetMetadataMaxSize = config.getInt(GroupCoordinatorConfig.OFFSET_METADATA_MAX_SIZE_CONFIG); this.classicGroupMaxSize = config.getInt(GroupCoordinatorConfig.GROUP_MAX_SIZE_CONFIG); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java index 44f8e0ce3dc3e..80486eeb26c5d 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java @@ -16,8 +16,10 @@ */ package org.apache.kafka.coordinator.group; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.coordinator.group.assignor.RangeAssignor; @@ -150,6 +152,26 @@ public void testInvalidConfigs() { configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, 40); assertEquals("group.consumer.session.timeout.ms must be less than or equals to group.consumer.max.session.timeout.ms", assertThrows(IllegalArgumentException.class, () -> createConfig(configs)).getMessage()); + + configs.clear(); + configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, Object.class); + assertEquals("Invalid value class java.lang.Object for configuration group.consumer.assignors: Expected a comma separated list.", + assertThrows(ConfigException.class, () -> createConfig(configs)).getMessage()); + + configs.clear(); + configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, Collections.singletonList(Object.class)); + assertEquals("class java.lang.Object is not an instance of org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor", + assertThrows(KafkaException.class, () -> createConfig(configs)).getMessage()); + + configs.clear(); + configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, "foobar"); + assertEquals("Invalid value foobar for configuration group.consumer.migration.policy: String must be one of (case insensitive): DISABLED, DOWNGRADE, UPGRADE, BIDIRECTIONAL", + assertThrows(ConfigException.class, () -> createConfig(configs)).getMessage()); + + configs.clear(); + configs.put(GroupCoordinatorConfig.OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG, -100); + assertEquals("Unknown compression type id: -100", + assertThrows(IllegalArgumentException.class, () -> createConfig(configs)).getMessage()); } public static GroupCoordinatorConfig createGroupCoordinatorConfig(