diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index 79f41c732ab0d..85a910d24bccd 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -1656,6 +1656,11 @@ public static void require(boolean requirement) { throw new IllegalArgumentException("requirement failed"); } + public static void require(boolean requirement, String errorMessage) { + if (!requirement) + throw new IllegalArgumentException(errorMessage); + } + /** * Merge multiple {@link ConfigDef} into one * @param configDefs List of {@link ConfigDef} diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index f9d567c2e3874..c429ea01727e9 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -890,8 +890,7 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) " to prevent unnecessary socket timeouts") require(replicaFetchWaitMaxMs <= replicaLagTimeMaxMs, "replica.fetch.wait.max.ms should always be less than or equal to replica.lag.time.max.ms" + " to prevent frequent changes in ISR") - require(groupCoordinatorConfig.offsetCommitRequiredAcks >= -1 && groupCoordinatorConfig.offsetCommitRequiredAcks <= groupCoordinatorConfig.offsetsTopicReplicationFactor, - "offsets.commit.required.acks must be greater or equal -1 and less or equal to offsets.topic.replication.factor") + val advertisedBrokerListenerNames = effectiveAdvertisedBrokerListeners.map(_.listenerName).toSet // validate KRaft-related configs @@ -1075,27 +1074,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) require(classOf[KafkaPrincipalSerde].isAssignableFrom(principalBuilderClass), s"${BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG} must implement KafkaPrincipalSerde") - // New group coordinator configs validation. - require(groupCoordinatorConfig.consumerGroupMaxHeartbeatIntervalMs >= groupCoordinatorConfig.consumerGroupMinHeartbeatIntervalMs, - s"${GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG} must be greater than or equals " + - s"to ${GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG}") - require(groupCoordinatorConfig.consumerGroupHeartbeatIntervalMs >= groupCoordinatorConfig.consumerGroupMinHeartbeatIntervalMs, - s"${GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG} must be greater than or equals " + - s"to ${GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG}") - require(groupCoordinatorConfig.consumerGroupHeartbeatIntervalMs <= groupCoordinatorConfig.consumerGroupMaxHeartbeatIntervalMs, - s"${GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG} must be less than or equals " + - s"to ${GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG}") - - require(groupCoordinatorConfig.consumerGroupMaxSessionTimeoutMs >= groupCoordinatorConfig.consumerGroupMinSessionTimeoutMs, - s"${GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG} must be greater than or equals " + - s"to ${GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG}") - require(groupCoordinatorConfig.consumerGroupSessionTimeoutMs >= groupCoordinatorConfig.consumerGroupMinSessionTimeoutMs, - s"${GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG} must be greater than or equals " + - s"to ${GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG}") - require(groupCoordinatorConfig.consumerGroupSessionTimeoutMs <= groupCoordinatorConfig.consumerGroupMaxSessionTimeoutMs, - s"${GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG} must be less than or equals " + - s"to ${GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG}") - require(shareGroupMaxHeartbeatIntervalMs >= shareGroupMinHeartbeatIntervalMs, s"${ShareGroupConfigs.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG} must be greater than or equals " + s"to ${ShareGroupConfigs.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG}") diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java index 4740b4e308bdb..ccf367194a054 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java @@ -39,9 +39,13 @@ import static org.apache.kafka.common.config.ConfigDef.Type.LONG; import static org.apache.kafka.common.config.ConfigDef.Type.SHORT; import static org.apache.kafka.common.config.ConfigDef.Type.STRING; +import static org.apache.kafka.common.utils.Utils.require; /** * The group coordinator configurations. + * This configuration utilizes several local variables instead of calling AbstractConfig#get.... as all configs here + * are static and non-dynamic, with some being accessed extremely frequently (e.g., offsets.commit.timeout.ms). + * Using local variable is advantageous as it avoids the overhead of repeatedly looking up these configurations in AbstractConfig. */ public class GroupCoordinatorConfig { /** ********* Group coordinator configuration ***********/ @@ -213,17 +217,87 @@ public class GroupCoordinatorConfig { */ public static final int CLASSIC_GROUP_NEW_MEMBER_JOIN_TIMEOUT_MS = 5 * 60 * 1000; - private final AbstractConfig config; + private final int numThreads; + private final int appendLingerMs; + private final int consumerGroupSessionTimeoutMs; + private final int consumerGroupHeartbeatIntervalMs; + private final int consumerGroupMaxSize; + private final List consumerGroupAssignors; + private final int offsetsTopicSegmentBytes; + private final int offsetMetadataMaxSize; + private final int classicGroupMaxSize; + private final int classicGroupInitialRebalanceDelayMs; + private final int classicGroupMinSessionTimeoutMs; + private final int classicGroupMaxSessionTimeoutMs; + private final long offsetsRetentionCheckIntervalMs; + private final long offsetsRetentionMs; + private final int offsetCommitTimeoutMs; + private final ConsumerGroupMigrationPolicy consumerGroupMigrationPolicy; + private final CompressionType offsetTopicCompressionType; + private final int offsetsLoadBufferSize; + private final int offsetsTopicPartitions; + private final short offsetsTopicReplicationFactor; + private final short offsetCommitRequiredAcks; + private final int consumerGroupMinSessionTimeoutMs; + private final int consumerGroupMaxSessionTimeoutMs; + private final int consumerGroupMinHeartbeatIntervalMs; + private final int consumerGroupMaxHeartbeatIntervalMs; public GroupCoordinatorConfig(AbstractConfig config) { - this.config = config; + this.numThreads = config.getInt(GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG); + this.appendLingerMs = config.getInt(GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG); + this.consumerGroupSessionTimeoutMs = config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG); + this.consumerGroupHeartbeatIntervalMs = config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG); + this.consumerGroupMaxSize = config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SIZE_CONFIG); + this.consumerGroupAssignors = Collections.unmodifiableList( + config.getConfiguredInstances(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, ConsumerGroupPartitionAssignor.class)); + this.offsetsTopicSegmentBytes = config.getInt(GroupCoordinatorConfig.OFFSETS_TOPIC_SEGMENT_BYTES_CONFIG); + this.offsetMetadataMaxSize = config.getInt(GroupCoordinatorConfig.OFFSET_METADATA_MAX_SIZE_CONFIG); + this.classicGroupMaxSize = config.getInt(GroupCoordinatorConfig.GROUP_MAX_SIZE_CONFIG); + this.classicGroupInitialRebalanceDelayMs = config.getInt(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG); + this.classicGroupMinSessionTimeoutMs = config.getInt(GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG); + this.classicGroupMaxSessionTimeoutMs = config.getInt(GroupCoordinatorConfig.GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG); + this.offsetsRetentionCheckIntervalMs = config.getLong(GroupCoordinatorConfig.OFFSETS_RETENTION_CHECK_INTERVAL_MS_CONFIG); + this.offsetsRetentionMs = config.getInt(GroupCoordinatorConfig.OFFSETS_RETENTION_MINUTES_CONFIG) * 60L * 1000L; + this.offsetCommitTimeoutMs = config.getInt(GroupCoordinatorConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG); + this.consumerGroupMigrationPolicy = ConsumerGroupMigrationPolicy.parse( + config.getString(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG)); + this.offsetTopicCompressionType = Optional.ofNullable(config.getInt(GroupCoordinatorConfig.OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG)) + .map(CompressionType::forId) + .orElse(null); + this.offsetsLoadBufferSize = config.getInt(GroupCoordinatorConfig.OFFSETS_LOAD_BUFFER_SIZE_CONFIG); + this.offsetsTopicPartitions = config.getInt(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG); + this.offsetsTopicReplicationFactor = config.getShort(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG); + this.offsetCommitRequiredAcks = config.getShort(GroupCoordinatorConfig.OFFSET_COMMIT_REQUIRED_ACKS_CONFIG); + this.consumerGroupMinSessionTimeoutMs = config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG); + this.consumerGroupMaxSessionTimeoutMs = config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG); + this.consumerGroupMinHeartbeatIntervalMs = config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG); + this.consumerGroupMaxHeartbeatIntervalMs = config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG); + + require(offsetCommitRequiredAcks >= -1 && offsetCommitRequiredAcks <= offsetsTopicReplicationFactor, + String.format("%s must be greater or equal to -1 and less or equal to %s", OFFSET_COMMIT_REQUIRED_ACKS_CONFIG, OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG)); + + // New group coordinator configs validation. + require(consumerGroupMaxHeartbeatIntervalMs >= consumerGroupMinHeartbeatIntervalMs, + String.format("%s must be greater than or equals to %s", CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG)); + require(consumerGroupHeartbeatIntervalMs >= consumerGroupMinHeartbeatIntervalMs, + String.format("%s must be greater than or equals to %s", CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG)); + require(consumerGroupHeartbeatIntervalMs <= consumerGroupMaxHeartbeatIntervalMs, + String.format("%s must be less than or equals to %s", CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG)); + + require(consumerGroupMaxSessionTimeoutMs >= consumerGroupMinSessionTimeoutMs, + String.format("%s must be greater than or equals to %s", CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG)); + require(consumerGroupSessionTimeoutMs >= consumerGroupMinSessionTimeoutMs, + String.format("%s must be greater than or equals to %s", CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG)); + require(consumerGroupSessionTimeoutMs <= consumerGroupMaxSessionTimeoutMs, + String.format("%s must be less than or equals to %s", CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG)); } /** * The number of threads or event loops running. */ public int numThreads() { - return config.getInt(GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG); + return numThreads; } /** @@ -231,35 +305,35 @@ public int numThreads() { * accumulate before flushing them to disk. */ public int appendLingerMs() { - return config.getInt(GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG); + return appendLingerMs; } /** * The consumer group session timeout in milliseconds. */ public int consumerGroupSessionTimeoutMs() { - return config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG); + return consumerGroupSessionTimeoutMs; } /** * The consumer group heartbeat interval in milliseconds. */ public int consumerGroupHeartbeatIntervalMs() { - return config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG); + return consumerGroupHeartbeatIntervalMs; } /** * The consumer group maximum size. */ public int consumerGroupMaxSize() { - return config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SIZE_CONFIG); + return consumerGroupMaxSize; } /** * The consumer group assignors. */ public List consumerGroupAssignors() { - return config.getConfiguredInstances(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, ConsumerGroupPartitionAssignor.class); + return consumerGroupAssignors; } /** @@ -267,28 +341,28 @@ public List consumerGroupAssignors() { * log compaction and faster offset loads. */ public int offsetsTopicSegmentBytes() { - return config.getInt(GroupCoordinatorConfig.OFFSETS_TOPIC_SEGMENT_BYTES_CONFIG); + return offsetsTopicSegmentBytes; } /** * The maximum size for a metadata entry associated with an offset commit. */ public int offsetMetadataMaxSize() { - return config.getInt(GroupCoordinatorConfig.OFFSET_METADATA_MAX_SIZE_CONFIG); + return offsetMetadataMaxSize; } /** * The classic group maximum size. */ public int classicGroupMaxSize() { - return config.getInt(GroupCoordinatorConfig.GROUP_MAX_SIZE_CONFIG); + return classicGroupMaxSize; } /** * The delay in milliseconds introduced for the first rebalance of a classic group. */ public int classicGroupInitialRebalanceDelayMs() { - return config.getInt(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG); + return classicGroupInitialRebalanceDelayMs; } /** @@ -302,21 +376,21 @@ public int classicGroupNewMemberJoinTimeoutMs() { * The classic group minimum session timeout. */ public int classicGroupMinSessionTimeoutMs() { - return config.getInt(GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG); + return classicGroupMinSessionTimeoutMs; } /** * The classic group maximum session timeout. */ public int classicGroupMaxSessionTimeoutMs() { - return config.getInt(GroupCoordinatorConfig.GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG); + return classicGroupMaxSessionTimeoutMs; } /** * Frequency at which to check for expired offsets. */ public long offsetsRetentionCheckIntervalMs() { - return config.getLong(GroupCoordinatorConfig.OFFSETS_RETENTION_CHECK_INTERVAL_MS_CONFIG); + return offsetsRetentionCheckIntervalMs; } /** @@ -334,7 +408,7 @@ public long offsetsRetentionCheckIntervalMs() { * committed offsets for that topic will also be deleted without extra retention period. */ public long offsetsRetentionMs() { - return config.getInt(GroupCoordinatorConfig.OFFSETS_RETENTION_MINUTES_CONFIG) * 60L * 1000L; + return offsetsRetentionMs; } /** @@ -342,24 +416,21 @@ public long offsetsRetentionMs() { * or this timeout is reached */ public int offsetCommitTimeoutMs() { - return config.getInt(GroupCoordinatorConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG); + return offsetCommitTimeoutMs; } /** * The config indicating whether group protocol upgrade/downgrade are allowed. */ public ConsumerGroupMigrationPolicy consumerGroupMigrationPolicy() { - return ConsumerGroupMigrationPolicy.parse( - config.getString(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG)); + return consumerGroupMigrationPolicy; } /** * The compression type used to compress records in batches. */ public CompressionType offsetTopicCompressionType() { - return Optional.ofNullable(config.getInt(GroupCoordinatorConfig.OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG)) - .map(CompressionType::forId) - .orElse(null); + return offsetTopicCompressionType; } /** @@ -367,14 +438,14 @@ public CompressionType offsetTopicCompressionType() { * the cache (soft-limit, overridden if records are too large). */ public int offsetsLoadBufferSize() { - return config.getInt(GroupCoordinatorConfig.OFFSETS_LOAD_BUFFER_SIZE_CONFIG); + return offsetsLoadBufferSize; } /** * The number of partitions for the offset commit topic (should not change after deployment). */ public int offsetsTopicPartitions() { - return config.getInt(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG); + return offsetsTopicPartitions; } /** @@ -382,7 +453,7 @@ public int offsetsTopicPartitions() { * Internal topic creation will fail until the cluster size meets this replication factor requirement. */ public short offsetsTopicReplicationFactor() { - return config.getShort(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG); + return offsetsTopicReplicationFactor; } /** @@ -391,34 +462,34 @@ public short offsetsTopicReplicationFactor() { */ @Deprecated // since 3.8 public short offsetCommitRequiredAcks() { - return config.getShort(GroupCoordinatorConfig.OFFSET_COMMIT_REQUIRED_ACKS_CONFIG); + return offsetCommitRequiredAcks; } /** * The minimum allowed session timeout for registered consumers. */ public int consumerGroupMinSessionTimeoutMs() { - return config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG); + return consumerGroupMinSessionTimeoutMs; } /** * The maximum allowed session timeout for registered consumers. */ public int consumerGroupMaxSessionTimeoutMs() { - return config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG); + return consumerGroupMaxSessionTimeoutMs; } /** * The minimum heartbeat interval for registered consumers. */ public int consumerGroupMinHeartbeatIntervalMs() { - return config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG); + return consumerGroupMinHeartbeatIntervalMs; } /** * The maximum heartbeat interval for registered consumers. */ public int consumerGroupMaxHeartbeatIntervalMs() { - return config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG); + return consumerGroupMaxHeartbeatIntervalMs; } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java index ba3b357ae67ec..80486eeb26c5d 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java @@ -16,8 +16,10 @@ */ package org.apache.kafka.coordinator.group; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.coordinator.group.assignor.RangeAssignor; @@ -32,6 +34,7 @@ import java.util.Map; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; @SuppressWarnings("deprecation") public class GroupCoordinatorConfigTest { @@ -46,8 +49,8 @@ public void testConfigs() { Map configs = new HashMap<>(); configs.put(GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG, 10); configs.put(GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG, 10); - configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, 30); - configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 10); + configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, 555); + configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 200); configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SIZE_CONFIG, 55); configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, Collections.singletonList(RangeAssignor.class)); configs.put(GroupCoordinatorConfig.OFFSETS_TOPIC_SEGMENT_BYTES_CONFIG, 2222); @@ -70,12 +73,11 @@ public void testConfigs() { configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, 111); configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, 222); - GroupCoordinatorConfig config = new GroupCoordinatorConfig( - new AbstractConfig(Utils.mergeConfigs(GROUP_COORDINATOR_CONFIG_DEFS), configs, false)); + GroupCoordinatorConfig config = createConfig(configs); assertEquals(10, config.numThreads()); - assertEquals(30, config.consumerGroupSessionTimeoutMs()); - assertEquals(10, config.consumerGroupHeartbeatIntervalMs()); + assertEquals(555, config.consumerGroupSessionTimeoutMs()); + assertEquals(200, config.consumerGroupHeartbeatIntervalMs()); assertEquals(55, config.consumerGroupMaxSize()); assertEquals(1, config.consumerGroupAssignors().size()); assertEquals(RangeAssignor.RANGE_ASSIGNOR_NAME, config.consumerGroupAssignors().get(0).name()); @@ -101,6 +103,77 @@ public void testConfigs() { assertEquals(222, config.consumerGroupMaxHeartbeatIntervalMs()); } + @Test + public void testInvalidConfigs() { + Map configs = new HashMap<>(); + configs.put(GroupCoordinatorConfig.OFFSET_COMMIT_REQUIRED_ACKS_CONFIG, (short) -2); + configs.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, (short) 3); + assertEquals("offsets.commit.required.acks must be greater or equal to -1 and less or equal to offsets.topic.replication.factor", + assertThrows(IllegalArgumentException.class, () -> createConfig(configs)).getMessage()); + + configs.clear(); + configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, 10); + configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, 20); + configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 20); + assertEquals("group.consumer.max.heartbeat.interval.ms must be greater than or equals to group.consumer.min.heartbeat.interval.ms", + assertThrows(IllegalArgumentException.class, () -> createConfig(configs)).getMessage()); + + configs.clear(); + configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, 30); + configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, 20); + configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 10); + assertEquals("group.consumer.heartbeat.interval.ms must be greater than or equals to group.consumer.min.heartbeat.interval.ms", + assertThrows(IllegalArgumentException.class, () -> createConfig(configs)).getMessage()); + + configs.clear(); + configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, 30); + configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, 20); + configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 40); + assertEquals("group.consumer.heartbeat.interval.ms must be less than or equals to group.consumer.max.heartbeat.interval.ms", + assertThrows(IllegalArgumentException.class, () -> createConfig(configs)).getMessage()); + + configs.clear(); + configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, 10); + configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, 20); + configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, 20); + assertEquals("group.consumer.max.session.timeout.ms must be greater than or equals to group.consumer.min.session.timeout.ms", + assertThrows(IllegalArgumentException.class, () -> createConfig(configs)).getMessage()); + + configs.clear(); + configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, 30); + configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, 20); + configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, 10); + assertEquals("group.consumer.session.timeout.ms must be greater than or equals to group.consumer.min.session.timeout.ms", + assertThrows(IllegalArgumentException.class, () -> createConfig(configs)).getMessage()); + + configs.clear(); + configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, 30); + configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, 20); + configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, 40); + assertEquals("group.consumer.session.timeout.ms must be less than or equals to group.consumer.max.session.timeout.ms", + assertThrows(IllegalArgumentException.class, () -> createConfig(configs)).getMessage()); + + configs.clear(); + configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, Object.class); + assertEquals("Invalid value class java.lang.Object for configuration group.consumer.assignors: Expected a comma separated list.", + assertThrows(ConfigException.class, () -> createConfig(configs)).getMessage()); + + configs.clear(); + configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, Collections.singletonList(Object.class)); + assertEquals("class java.lang.Object is not an instance of org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor", + assertThrows(KafkaException.class, () -> createConfig(configs)).getMessage()); + + configs.clear(); + configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, "foobar"); + assertEquals("Invalid value foobar for configuration group.consumer.migration.policy: String must be one of (case insensitive): DISABLED, DOWNGRADE, UPGRADE, BIDIRECTIONAL", + assertThrows(ConfigException.class, () -> createConfig(configs)).getMessage()); + + configs.clear(); + configs.put(GroupCoordinatorConfig.OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG, -100); + assertEquals("Unknown compression type id: -100", + assertThrows(IllegalArgumentException.class, () -> createConfig(configs)).getMessage()); + } + public static GroupCoordinatorConfig createGroupCoordinatorConfig( int offsetMetadataMaxSize, long offsetsRetentionCheckIntervalMs, @@ -110,7 +183,9 @@ public static GroupCoordinatorConfig createGroupCoordinatorConfig( configs.put(GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG, 1); configs.put(GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG, 10); configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, 45); + configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, 45); configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 5); + configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, 5); configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SIZE_CONFIG, Integer.MAX_VALUE); configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, Collections.singletonList(RangeAssignor.class)); configs.put(GroupCoordinatorConfig.OFFSETS_TOPIC_SEGMENT_BYTES_CONFIG, 1000); @@ -125,6 +200,10 @@ public static GroupCoordinatorConfig createGroupCoordinatorConfig( configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, ConsumerGroupMigrationPolicy.DISABLED.name()); configs.put(GroupCoordinatorConfig.OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG, (int) CompressionType.NONE.id); + return createConfig(configs); + } + + private static GroupCoordinatorConfig createConfig(Map configs) { return new GroupCoordinatorConfig( new AbstractConfig(Utils.mergeConfigs(GROUP_COORDINATOR_CONFIG_DEFS), configs, false)); }