From 43d1c4e4df4b49280d2ed46a46aa46b0e360496a Mon Sep 17 00:00:00 2001 From: OmniaGM Date: Tue, 2 Jul 2024 12:25:10 +0100 Subject: [PATCH 1/4] KAFKA-15853: Refactor ShareGroupConfig with AbstractConfig --- .../org/apache/kafka/common/utils/Utils.java | 10 ++ .../main/scala/kafka/server/KafkaConfig.scala | 49 +--------- .../unit/kafka/server/KafkaConfigTest.scala | 66 ++++++------- .../server/config/AbstractKafkaConfig.java | 2 +- ...roupConfigs.java => ShareGroupConfig.java} | 97 ++++++++++++++++++- 5 files changed, 145 insertions(+), 79 deletions(-) rename server/src/main/java/org/apache/kafka/server/config/{ShareGroupConfigs.java => ShareGroupConfig.java} (64%) diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index 79f41c732ab0d..81b1d7d335915 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -1656,6 +1656,16 @@ public static void require(boolean requirement) { throw new IllegalArgumentException("requirement failed"); } + /** + * Checks requirement. Throw {@link IllegalArgumentException} if {@code requirement} failed. + * @param requirement Requirement to check. + * @param message String to include in the failure message + */ + public static void require(boolean requirement, String message) { + if (!requirement) + throw new IllegalArgumentException("requirement failed: " + message); + } + /** * Merge multiple {@link ConfigDef} into one * @param configDefs List of {@link ConfigDef} diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 1f37b0b1ba74f..820738ffa05da 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -44,7 +44,7 @@ import org.apache.kafka.server.ProcessRole import org.apache.kafka.server.authorizer.Authorizer import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.common.MetadataVersion._ -import org.apache.kafka.server.config.{AbstractKafkaConfig, DelegationTokenManagerConfigs, KRaftConfigs, QuotaConfigs, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ShareGroupConfigs, ZkConfigs} +import org.apache.kafka.server.config.{AbstractKafkaConfig, DelegationTokenManagerConfigs, KRaftConfigs, QuotaConfigs, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ShareGroupConfig, ZkConfigs} import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig import org.apache.kafka.server.metrics.MetricConfigs import org.apache.kafka.server.util.Csv @@ -233,7 +233,10 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami def remoteLogManagerConfig = _remoteLogManagerConfig private val _groupCoordinatorConfig = new GroupCoordinatorConfig(this) + private val _shareGroupConfig = new ShareGroupConfig(this) + def groupCoordinatorConfig: GroupCoordinatorConfig = _groupCoordinatorConfig + def shareGroupConfig: ShareGroupConfig = _shareGroupConfig; private def zkBooleanConfigOrSystemPropertyWithDefaultValue(propKey: String): Boolean = { // Use the system property if it exists and the Kafka config value was defaulted rather than actually provided @@ -582,22 +585,6 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami val isNewGroupCoordinatorEnabled = getBoolean(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG) || groupCoordinatorRebalanceProtocols.contains(GroupType.CONSUMER) - /** Share group configuration **/ - val isShareGroupEnabled = getBoolean(ShareGroupConfigs.SHARE_GROUP_ENABLE_CONFIG) - val shareGroupPartitionMaxRecordLocks = getInt(ShareGroupConfigs.SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG) - val shareGroupDeliveryCountLimit = getInt(ShareGroupConfigs.SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG) - val shareGroupMaxGroups = getShort(ShareGroupConfigs.SHARE_GROUP_MAX_GROUPS_CONFIG) - val shareGroupMaxSize = getShort(ShareGroupConfigs.SHARE_GROUP_MAX_SIZE_CONFIG) - val shareGroupSessionTimeoutMs = getInt(ShareGroupConfigs.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG) - val shareGroupMinSessionTimeoutMs = getInt(ShareGroupConfigs.SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG) - val shareGroupMaxSessionTimeoutMs = getInt(ShareGroupConfigs.SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG) - val shareGroupHeartbeatIntervalMs = getInt(ShareGroupConfigs.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG) - val shareGroupMinHeartbeatIntervalMs = getInt(ShareGroupConfigs.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG) - val shareGroupMaxHeartbeatIntervalMs = getInt(ShareGroupConfigs.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG) - val shareGroupRecordLockDurationMs = getInt(ShareGroupConfigs.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG) - val shareGroupMaxRecordLockDurationMs = getInt(ShareGroupConfigs.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG) - val shareGroupMinRecordLockDurationMs = getInt(ShareGroupConfigs.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG) - /** ********* Transaction management configuration ***********/ val transactionalIdExpirationMs = getInt(TransactionStateManagerConfigs.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG) val transactionMaxTimeoutMs = getInt(TransactionStateManagerConfigs.TRANSACTIONS_MAX_TIMEOUT_MS_CONFIG) @@ -1098,33 +1085,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami s"${GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG} must be less than or equals " + s"to ${GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG}") - require(shareGroupMaxHeartbeatIntervalMs >= shareGroupMinHeartbeatIntervalMs, - s"${ShareGroupConfigs.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG} must be greater than or equals " + - s"to ${ShareGroupConfigs.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG}") - require(shareGroupHeartbeatIntervalMs >= shareGroupMinHeartbeatIntervalMs, - s"${ShareGroupConfigs.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG} must be greater than or equals " + - s"to ${ShareGroupConfigs.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG}") - require(shareGroupHeartbeatIntervalMs <= shareGroupMaxHeartbeatIntervalMs, - s"${ShareGroupConfigs.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG} must be less than or equals " + - s"to ${ShareGroupConfigs.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG}") - - require(shareGroupMaxSessionTimeoutMs >= shareGroupMinSessionTimeoutMs, - s"${ShareGroupConfigs.SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG} must be greater than or equals " + - s"to ${ShareGroupConfigs.SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG}") - require(shareGroupSessionTimeoutMs >= shareGroupMinSessionTimeoutMs, - s"${ShareGroupConfigs.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG} must be greater than or equals " + - s"to ${ShareGroupConfigs.SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG}") - require(shareGroupSessionTimeoutMs <= shareGroupMaxSessionTimeoutMs, - s"${ShareGroupConfigs.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG} must be less than or equals " + - s"to ${ShareGroupConfigs.SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG}") - - require(shareGroupRecordLockDurationMs >= shareGroupMinRecordLockDurationMs, - s"${ShareGroupConfigs.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG} must be greater than or equals " + - s"to ${ShareGroupConfigs.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG}") - require(shareGroupMaxRecordLockDurationMs >= shareGroupRecordLockDurationMs, - s"${ShareGroupConfigs.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG} must be greater than or equals " + - s"to ${ShareGroupConfigs.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG}") - + _shareGroupConfig.validate(); if (originals.containsKey(GroupCoordinatorConfig.OFFSET_COMMIT_REQUIRED_ACKS_CONFIG)) { warn(s"${GroupCoordinatorConfig.OFFSET_COMMIT_REQUIRED_ACKS_CONFIG} is deprecated and it will be removed in Apache Kafka 4.0.") } diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index c63fa40f8dd07..5252e1c347270 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -41,7 +41,7 @@ import org.apache.kafka.raft.QuorumConfig import org.apache.kafka.security.PasswordEncoderConfigs import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.common.MetadataVersion.{IBP_0_8_2, IBP_3_0_IV1} -import org.apache.kafka.server.config.{DelegationTokenManagerConfigs, KRaftConfigs, QuotaConfigs, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms, ShareGroupConfigs, ZkConfigs} +import org.apache.kafka.server.config.{DelegationTokenManagerConfigs, KRaftConfigs, QuotaConfigs, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms, ShareGroupConfig, ZkConfigs} import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig import org.apache.kafka.server.metrics.MetricConfigs import org.apache.kafka.storage.internals.log.CleanerConfig @@ -1128,19 +1128,19 @@ class KafkaConfigTest { case GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG => // ignore string /** Share groups configs */ - case ShareGroupConfigs.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1) - case ShareGroupConfigs.SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1) - case ShareGroupConfigs.SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1) - case ShareGroupConfigs.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1) - case ShareGroupConfigs.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1) - case ShareGroupConfigs.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1) - case ShareGroupConfigs.SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1) - case ShareGroupConfigs.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1) - case ShareGroupConfigs.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1) - case ShareGroupConfigs.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1) - case ShareGroupConfigs.SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1) - case ShareGroupConfigs.SHARE_GROUP_MAX_GROUPS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1) - case ShareGroupConfigs.SHARE_GROUP_MAX_SIZE_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1) + case ShareGroupConfig.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1) + case ShareGroupConfig.SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1) + case ShareGroupConfig.SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1) + case ShareGroupConfig.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1) + case ShareGroupConfig.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1) + case ShareGroupConfig.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1) + case ShareGroupConfig.SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1) + case ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1) + case ShareGroupConfig.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1) + case ShareGroupConfig.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1) + case ShareGroupConfig.SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1) + case ShareGroupConfig.SHARE_GROUP_MAX_GROUPS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1) + case ShareGroupConfig.SHARE_GROUP_MAX_SIZE_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1) case _ => assertPropertyInvalid(baseProperties, name, "not_a_number", "-1") @@ -1997,16 +1997,16 @@ class KafkaConfigTest { props.putAll(kraftProps()) // Max should be greater than or equals to min. - props.put(ShareGroupConfigs.SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, "20") - props.put(ShareGroupConfigs.SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, "10") + props.put(ShareGroupConfig.SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, "20") + props.put(ShareGroupConfig.SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, "10") assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props)) // The timeout should be within the min-max range. - props.put(ShareGroupConfigs.SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, "10") - props.put(ShareGroupConfigs.SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, "20") - props.put(ShareGroupConfigs.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG, "5") + props.put(ShareGroupConfig.SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, "10") + props.put(ShareGroupConfig.SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, "20") + props.put(ShareGroupConfig.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG, "5") assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props)) - props.put(ShareGroupConfigs.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG, "25") + props.put(ShareGroupConfig.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG, "25") assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props)) } @@ -2016,16 +2016,16 @@ class KafkaConfigTest { props.putAll(kraftProps()) // Max should be greater than or equals to min. - props.put(ShareGroupConfigs.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, "20") - props.put(ShareGroupConfigs.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, "10") + props.put(ShareGroupConfig.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, "20") + props.put(ShareGroupConfig.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, "10") assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props)) // The timeout should be within the min-max range. - props.put(ShareGroupConfigs.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, "10") - props.put(ShareGroupConfigs.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, "20") - props.put(ShareGroupConfigs.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, "5") + props.put(ShareGroupConfig.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, "10") + props.put(ShareGroupConfig.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, "20") + props.put(ShareGroupConfig.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, "5") assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props)) - props.put(ShareGroupConfigs.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, "25") + props.put(ShareGroupConfig.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, "25") assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props)) } @@ -2034,19 +2034,19 @@ class KafkaConfigTest { val props = new Properties() props.putAll(kraftProps()) - props.put(ShareGroupConfigs.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG, "10") + props.put(ShareGroupConfig.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG, "10") assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props)) - props.put(ShareGroupConfigs.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG, "10000000") + props.put(ShareGroupConfig.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG, "10000000") assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props)) // The duration should be within the min-max range. - props.put(ShareGroupConfigs.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG, "1000") - props.put(ShareGroupConfigs.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG, "3600000") - props.put(ShareGroupConfigs.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG, "999") + props.put(ShareGroupConfig.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG, "1000") + props.put(ShareGroupConfig.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG, "3600000") + props.put(ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG, "999") assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props)) - props.put(ShareGroupConfigs.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG, "3600001") + props.put(ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG, "3600001") assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props)) - props.put(ShareGroupConfigs.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG, "30000") + props.put(ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG, "30000") assertDoesNotThrow(() => KafkaConfig.fromProps(props)) } } diff --git a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java index 245562e0fc382..a19cbf742e6fd 100644 --- a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java +++ b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java @@ -55,7 +55,7 @@ public abstract class AbstractKafkaConfig extends AbstractConfig { GroupCoordinatorConfig.CONSUMER_GROUP_CONFIG_DEF, CleanerConfig.CONFIG_DEF, LogConfig.SERVER_CONFIG_DEF, - ShareGroupConfigs.CONFIG_DEF, + ShareGroupConfig.CONFIG_DEF, TransactionLogConfigs.CONFIG_DEF, TransactionStateManagerConfigs.CONFIG_DEF, QuorumConfig.CONFIG_DEF, diff --git a/server/src/main/java/org/apache/kafka/server/config/ShareGroupConfigs.java b/server/src/main/java/org/apache/kafka/server/config/ShareGroupConfig.java similarity index 64% rename from server/src/main/java/org/apache/kafka/server/config/ShareGroupConfigs.java rename to server/src/main/java/org/apache/kafka/server/config/ShareGroupConfig.java index 2957686f6aab5..6dd8473b06b21 100644 --- a/server/src/main/java/org/apache/kafka/server/config/ShareGroupConfigs.java +++ b/server/src/main/java/org/apache/kafka/server/config/ShareGroupConfig.java @@ -16,7 +16,9 @@ */ package org.apache.kafka.server.config; +import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.utils.Utils; import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM; import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; @@ -25,7 +27,7 @@ import static org.apache.kafka.common.config.ConfigDef.Type.INT; import static org.apache.kafka.common.config.ConfigDef.Type.SHORT; -public class ShareGroupConfigs { +public class ShareGroupConfig { /** Share Group Configurations **/ // Internal configuration used by integration and system tests. @@ -100,4 +102,97 @@ public class ShareGroupConfigs { .define(SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, INT, SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC) .define(SHARE_GROUP_MAX_GROUPS_CONFIG, SHORT, SHARE_GROUP_MAX_GROUPS_DEFAULT, between(1, 100), MEDIUM, SHARE_GROUP_MAX_GROUPS_DOC) .define(SHARE_GROUP_MAX_SIZE_CONFIG, SHORT, SHARE_GROUP_MAX_SIZE_DEFAULT, between(10, 1000), MEDIUM, SHARE_GROUP_MAX_SIZE_DOC); + + private final AbstractConfig config; + + public ShareGroupConfig(AbstractConfig config) { + this.config = config; + } + + /** Share group configuration **/ + Boolean isShareGroupEnabled() { + return config.getBoolean(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG); + } + + public int shareGroupPartitionMaxRecordLocks() { + return config.getInt(ShareGroupConfig.SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG); + } + + public int shareGroupDeliveryCountLimit() { + return config.getInt(ShareGroupConfig.SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG); + } + + public short shareGroupMaxGroups() { + return config.getShort(ShareGroupConfig.SHARE_GROUP_MAX_GROUPS_CONFIG); + } + + public short shareGroupMaxSize() { + return config.getShort(ShareGroupConfig.SHARE_GROUP_MAX_SIZE_CONFIG); + } + + public int shareGroupSessionTimeoutMs() { + return config.getInt(ShareGroupConfig.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG); + } + + public int shareGroupMinSessionTimeoutMs() { + return config.getInt(ShareGroupConfig.SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG); + } + + public int shareGroupMaxSessionTimeoutMs() { + return config.getInt(ShareGroupConfig.SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG); + } + + public int shareGroupHeartbeatIntervalMs() { + return config.getInt(ShareGroupConfig.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG); + } + + public int shareGroupMinHeartbeatIntervalMs() { + return config.getInt(ShareGroupConfig.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG); + } + + public int shareGroupMaxHeartbeatIntervalMs() { + return config.getInt(ShareGroupConfig.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG); + } + + public int shareGroupRecordLockDurationMs() { + return config.getInt(ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG); + } + + public int shareGroupMaxRecordLockDurationMs() { + return config.getInt(ShareGroupConfig.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG); + } + + public int shareGroupMinRecordLockDurationMs() { + return config.getInt(ShareGroupConfig.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG); + } + + public void validate() { + Utils.require(shareGroupMaxHeartbeatIntervalMs() >= shareGroupMinHeartbeatIntervalMs(), + String.format("%s must be greater than or equals to %s", + SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG)); + Utils.require(shareGroupHeartbeatIntervalMs() >= shareGroupMinHeartbeatIntervalMs(), + String.format("%s must be greater than or equals to %s", + SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG)); + Utils.require(shareGroupHeartbeatIntervalMs() <= shareGroupMaxHeartbeatIntervalMs(), + String.format("%s must be less than or equals to %s", + SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG)); + + Utils.require(shareGroupMaxSessionTimeoutMs() >= shareGroupMinSessionTimeoutMs(), + String.format("%s must be greater than or equals to %s", + SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG)); + Utils.require(shareGroupSessionTimeoutMs() >= shareGroupMinSessionTimeoutMs(), + String.format("%s must be greater than or equals to %s", + SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG, SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG)); + Utils.require(shareGroupSessionTimeoutMs() <= shareGroupMaxSessionTimeoutMs(), + String.format("%s must be less than or equals to %s", + SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG, SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG)); + + Utils.require(shareGroupRecordLockDurationMs() >= shareGroupMinRecordLockDurationMs(), + String.format("%s must be greater than or equals to %s", + SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG, SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG)); + Utils.require(shareGroupMaxRecordLockDurationMs() >= shareGroupRecordLockDurationMs(), + String.format("%s must be greater than or equals to %s", + SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG, SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG)); + + } } From 5b4b86286c3e5f240336ebf45ad6596f787e5b1d Mon Sep 17 00:00:00 2001 From: OmniaGM Date: Tue, 9 Jul 2024 13:30:10 +0100 Subject: [PATCH 2/4] Introduce attributes --- .../org/apache/kafka/common/utils/Utils.java | 11 +-- .../main/scala/kafka/server/KafkaConfig.scala | 1 - .../kafka/server/config/ShareGroupConfig.java | 77 +++++++++++++------ 3 files changed, 55 insertions(+), 34 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index 3911aacb0f367..4485b1bd66dbf 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -1662,19 +1662,14 @@ public static void require(boolean requirement) { throw new IllegalArgumentException("requirement failed"); } - public static void require(boolean requirement, String errorMessage) { - if (!requirement) - throw new IllegalArgumentException(errorMessage); - } - /** * Checks requirement. Throw {@link IllegalArgumentException} if {@code requirement} failed. * @param requirement Requirement to check. - * @param message String to include in the failure message + * @param errorMessage String to include in the failure message */ - public static void require(boolean requirement, String message) { + public static void require(boolean requirement, String errorMessage) { if (!requirement) - throw new IllegalArgumentException("requirement failed: " + message); + throw new IllegalArgumentException(errorMessage); } /** diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 900dad82df068..8313b3c142076 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -1079,7 +1079,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) s"${GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG} must be less than or equals " + s"to ${GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG}") - _shareGroupConfig.validate(); if (originals.containsKey(GroupCoordinatorConfig.OFFSET_COMMIT_REQUIRED_ACKS_CONFIG)) { warn(s"${GroupCoordinatorConfig.OFFSET_COMMIT_REQUIRED_ACKS_CONFIG} is deprecated and it will be removed in Apache Kafka 4.0.") } diff --git a/server/src/main/java/org/apache/kafka/server/config/ShareGroupConfig.java b/server/src/main/java/org/apache/kafka/server/config/ShareGroupConfig.java index 6dd8473b06b21..c3f3d4754c6b2 100644 --- a/server/src/main/java/org/apache/kafka/server/config/ShareGroupConfig.java +++ b/server/src/main/java/org/apache/kafka/server/config/ShareGroupConfig.java @@ -103,94 +103,121 @@ public class ShareGroupConfig { .define(SHARE_GROUP_MAX_GROUPS_CONFIG, SHORT, SHARE_GROUP_MAX_GROUPS_DEFAULT, between(1, 100), MEDIUM, SHARE_GROUP_MAX_GROUPS_DOC) .define(SHARE_GROUP_MAX_SIZE_CONFIG, SHORT, SHARE_GROUP_MAX_SIZE_DEFAULT, between(10, 1000), MEDIUM, SHARE_GROUP_MAX_SIZE_DOC); - private final AbstractConfig config; + private final Boolean isShareGroupEnabled; + private final int shareGroupPartitionMaxRecordLocks; + private final int shareGroupDeliveryCountLimit; + private final short shareGroupMaxGroups; + private final short shareGroupMaxSize; + private final int shareGroupSessionTimeoutMs; + private final int shareGroupMinSessionTimeoutMs; + private final int shareGroupMaxSessionTimeoutMs; + private final int shareGroupHeartbeatIntervalMs; + private final int shareGroupMinHeartbeatIntervalMs; + private final int shareGroupMaxHeartbeatIntervalMs; + private final int shareGroupRecordLockDurationMs; + private final int shareGroupMaxRecordLockDurationMs; + private final int shareGroupMinRecordLockDurationMs; public ShareGroupConfig(AbstractConfig config) { - this.config = config; + isShareGroupEnabled = config.getBoolean(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG); + shareGroupPartitionMaxRecordLocks = config.getInt(ShareGroupConfig.SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG); + shareGroupDeliveryCountLimit = config.getInt(ShareGroupConfig.SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG); + shareGroupMaxGroups = config.getShort(ShareGroupConfig.SHARE_GROUP_MAX_GROUPS_CONFIG); + shareGroupSessionTimeoutMs = config.getInt(ShareGroupConfig.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG); + shareGroupMinSessionTimeoutMs = config.getInt(ShareGroupConfig.SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG); + shareGroupMaxSessionTimeoutMs = config.getInt(ShareGroupConfig.SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG); + shareGroupHeartbeatIntervalMs = config.getInt(ShareGroupConfig.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG); + shareGroupMinHeartbeatIntervalMs = config.getInt(ShareGroupConfig.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG); + shareGroupMaxHeartbeatIntervalMs = config.getInt(ShareGroupConfig.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG); + shareGroupRecordLockDurationMs = config.getInt(ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG); + shareGroupMaxRecordLockDurationMs = config.getInt(ShareGroupConfig.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG); + shareGroupMinRecordLockDurationMs = config.getInt(ShareGroupConfig.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG); + shareGroupMaxSize = config.getShort(ShareGroupConfig.SHARE_GROUP_MAX_SIZE_CONFIG); + validate(); } /** Share group configuration **/ Boolean isShareGroupEnabled() { - return config.getBoolean(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG); + return isShareGroupEnabled; } public int shareGroupPartitionMaxRecordLocks() { - return config.getInt(ShareGroupConfig.SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG); + return shareGroupPartitionMaxRecordLocks; } public int shareGroupDeliveryCountLimit() { - return config.getInt(ShareGroupConfig.SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG); + return shareGroupDeliveryCountLimit; } public short shareGroupMaxGroups() { - return config.getShort(ShareGroupConfig.SHARE_GROUP_MAX_GROUPS_CONFIG); + return shareGroupMaxGroups; } public short shareGroupMaxSize() { - return config.getShort(ShareGroupConfig.SHARE_GROUP_MAX_SIZE_CONFIG); + return shareGroupMaxSize; } public int shareGroupSessionTimeoutMs() { - return config.getInt(ShareGroupConfig.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG); + return shareGroupSessionTimeoutMs; } public int shareGroupMinSessionTimeoutMs() { - return config.getInt(ShareGroupConfig.SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG); + return shareGroupMinSessionTimeoutMs; } public int shareGroupMaxSessionTimeoutMs() { - return config.getInt(ShareGroupConfig.SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG); + return shareGroupMaxSessionTimeoutMs; } public int shareGroupHeartbeatIntervalMs() { - return config.getInt(ShareGroupConfig.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG); + return shareGroupHeartbeatIntervalMs; } public int shareGroupMinHeartbeatIntervalMs() { - return config.getInt(ShareGroupConfig.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG); + return shareGroupMinHeartbeatIntervalMs; } public int shareGroupMaxHeartbeatIntervalMs() { - return config.getInt(ShareGroupConfig.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG); + return shareGroupMaxHeartbeatIntervalMs; } public int shareGroupRecordLockDurationMs() { - return config.getInt(ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG); + return shareGroupRecordLockDurationMs; } public int shareGroupMaxRecordLockDurationMs() { - return config.getInt(ShareGroupConfig.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG); + return shareGroupMaxRecordLockDurationMs; } public int shareGroupMinRecordLockDurationMs() { - return config.getInt(ShareGroupConfig.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG); + return shareGroupMinRecordLockDurationMs; } - public void validate() { - Utils.require(shareGroupMaxHeartbeatIntervalMs() >= shareGroupMinHeartbeatIntervalMs(), + private void validate() { + Utils.require(shareGroupMaxHeartbeatIntervalMs >= shareGroupMinHeartbeatIntervalMs, String.format("%s must be greater than or equals to %s", SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG)); - Utils.require(shareGroupHeartbeatIntervalMs() >= shareGroupMinHeartbeatIntervalMs(), + Utils.require(shareGroupHeartbeatIntervalMs >= shareGroupMinHeartbeatIntervalMs, String.format("%s must be greater than or equals to %s", SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG)); - Utils.require(shareGroupHeartbeatIntervalMs() <= shareGroupMaxHeartbeatIntervalMs(), + Utils.require(shareGroupHeartbeatIntervalMs <= shareGroupMaxHeartbeatIntervalMs, String.format("%s must be less than or equals to %s", SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG)); - Utils.require(shareGroupMaxSessionTimeoutMs() >= shareGroupMinSessionTimeoutMs(), + Utils.require(shareGroupMaxSessionTimeoutMs >= shareGroupMinSessionTimeoutMs, String.format("%s must be greater than or equals to %s", SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG)); - Utils.require(shareGroupSessionTimeoutMs() >= shareGroupMinSessionTimeoutMs(), + Utils.require(shareGroupSessionTimeoutMs >= shareGroupMinSessionTimeoutMs, String.format("%s must be greater than or equals to %s", SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG, SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG)); - Utils.require(shareGroupSessionTimeoutMs() <= shareGroupMaxSessionTimeoutMs(), + Utils.require(shareGroupSessionTimeoutMs <= shareGroupMaxSessionTimeoutMs, String.format("%s must be less than or equals to %s", SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG, SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG)); - Utils.require(shareGroupRecordLockDurationMs() >= shareGroupMinRecordLockDurationMs(), + Utils.require(shareGroupRecordLockDurationMs >= shareGroupMinRecordLockDurationMs, String.format("%s must be greater than or equals to %s", SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG, SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG)); - Utils.require(shareGroupMaxRecordLockDurationMs() >= shareGroupRecordLockDurationMs(), + Utils.require(shareGroupMaxRecordLockDurationMs >= shareGroupRecordLockDurationMs, String.format("%s must be greater than or equals to %s", SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG, SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG)); From 5fa3037a604b5f43cd2a7406981a24a78f7f5df9 Mon Sep 17 00:00:00 2001 From: OmniaGM Date: Tue, 9 Jul 2024 14:14:38 +0100 Subject: [PATCH 3/4] fix merge --- .../main/scala/kafka/server/KafkaConfig.scala | 20 ------------------- 1 file changed, 20 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 8313b3c142076..6d3587da145d5 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -1058,26 +1058,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) require(classOf[KafkaPrincipalSerde].isAssignableFrom(principalBuilderClass), s"${BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG} must implement KafkaPrincipalSerde") - // New group coordinator configs validation. - require(groupCoordinatorConfig.consumerGroupMaxHeartbeatIntervalMs >= groupCoordinatorConfig.consumerGroupMinHeartbeatIntervalMs, - s"${GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG} must be greater than or equals " + - s"to ${GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG}") - require(groupCoordinatorConfig.consumerGroupHeartbeatIntervalMs >= groupCoordinatorConfig.consumerGroupMinHeartbeatIntervalMs, - s"${GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG} must be greater than or equals " + - s"to ${GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG}") - require(groupCoordinatorConfig.consumerGroupHeartbeatIntervalMs <= groupCoordinatorConfig.consumerGroupMaxHeartbeatIntervalMs, - s"${GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG} must be less than or equals " + - s"to ${GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG}") - - require(groupCoordinatorConfig.consumerGroupMaxSessionTimeoutMs >= groupCoordinatorConfig.consumerGroupMinSessionTimeoutMs, - s"${GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG} must be greater than or equals " + - s"to ${GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG}") - require(groupCoordinatorConfig.consumerGroupSessionTimeoutMs >= groupCoordinatorConfig.consumerGroupMinSessionTimeoutMs, - s"${GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG} must be greater than or equals " + - s"to ${GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG}") - require(groupCoordinatorConfig.consumerGroupSessionTimeoutMs <= groupCoordinatorConfig.consumerGroupMaxSessionTimeoutMs, - s"${GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG} must be less than or equals " + - s"to ${GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG}") if (originals.containsKey(GroupCoordinatorConfig.OFFSET_COMMIT_REQUIRED_ACKS_CONFIG)) { warn(s"${GroupCoordinatorConfig.OFFSET_COMMIT_REQUIRED_ACKS_CONFIG} is deprecated and it will be removed in Apache Kafka 4.0.") From 705f445891b1903d7ac7466f3934c36c14c4c976 Mon Sep 17 00:00:00 2001 From: OmniaGM Date: Wed, 10 Jul 2024 12:00:49 +0100 Subject: [PATCH 4/4] feedback --- .../java/org/apache/kafka/server/config/ShareGroupConfig.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/apache/kafka/server/config/ShareGroupConfig.java b/server/src/main/java/org/apache/kafka/server/config/ShareGroupConfig.java index c3f3d4754c6b2..43b1c8a05c77c 100644 --- a/server/src/main/java/org/apache/kafka/server/config/ShareGroupConfig.java +++ b/server/src/main/java/org/apache/kafka/server/config/ShareGroupConfig.java @@ -103,7 +103,7 @@ public class ShareGroupConfig { .define(SHARE_GROUP_MAX_GROUPS_CONFIG, SHORT, SHARE_GROUP_MAX_GROUPS_DEFAULT, between(1, 100), MEDIUM, SHARE_GROUP_MAX_GROUPS_DOC) .define(SHARE_GROUP_MAX_SIZE_CONFIG, SHORT, SHARE_GROUP_MAX_SIZE_DEFAULT, between(10, 1000), MEDIUM, SHARE_GROUP_MAX_SIZE_DOC); - private final Boolean isShareGroupEnabled; + private final boolean isShareGroupEnabled; private final int shareGroupPartitionMaxRecordLocks; private final int shareGroupDeliveryCountLimit; private final short shareGroupMaxGroups; @@ -137,7 +137,7 @@ public ShareGroupConfig(AbstractConfig config) { } /** Share group configuration **/ - Boolean isShareGroupEnabled() { + public boolean isShareGroupEnabled() { return isShareGroupEnabled; }