Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1662,6 +1662,11 @@ public static void require(boolean requirement) {
throw new IllegalArgumentException("requirement failed");
}

/**
* Checks requirement. Throw {@link IllegalArgumentException} if {@code requirement} failed.
* @param requirement Requirement to check.
* @param errorMessage String to include in the failure message
*/
public static void require(boolean requirement, String errorMessage) {
if (!requirement)
throw new IllegalArgumentException(errorMessage);
Expand Down
48 changes: 5 additions & 43 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion._
import org.apache.kafka.server.config.{AbstractKafkaConfig, DelegationTokenManagerConfigs, KRaftConfigs, QuotaConfigs, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ShareGroupConfigs, ZkConfigs}
import org.apache.kafka.server.config.{AbstractKafkaConfig, DelegationTokenManagerConfigs, KRaftConfigs, QuotaConfigs, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ShareGroupConfig, ZkConfigs}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.metrics.MetricConfigs
import org.apache.kafka.server.util.Csv
Expand Down Expand Up @@ -231,8 +231,12 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
def remoteLogManagerConfig = _remoteLogManagerConfig

private val _groupCoordinatorConfig = new GroupCoordinatorConfig(this)

def groupCoordinatorConfig: GroupCoordinatorConfig = _groupCoordinatorConfig

private val _shareGroupConfig = new ShareGroupConfig(this)
def shareGroupConfig: ShareGroupConfig = _shareGroupConfig;

private def zkBooleanConfigOrSystemPropertyWithDefaultValue(propKey: String): Boolean = {
// Use the system property if it exists and the Kafka config value was defaulted rather than actually provided
// Need to translate any system property value from true/false (String) to true/false (Boolean)
Expand Down Expand Up @@ -580,22 +584,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
val isNewGroupCoordinatorEnabled = getBoolean(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG) ||
groupCoordinatorRebalanceProtocols.contains(GroupType.CONSUMER)

/** Share group configuration **/
val isShareGroupEnabled = getBoolean(ShareGroupConfigs.SHARE_GROUP_ENABLE_CONFIG)
val shareGroupPartitionMaxRecordLocks = getInt(ShareGroupConfigs.SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG)
val shareGroupDeliveryCountLimit = getInt(ShareGroupConfigs.SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG)
val shareGroupMaxGroups = getShort(ShareGroupConfigs.SHARE_GROUP_MAX_GROUPS_CONFIG)
val shareGroupMaxSize = getShort(ShareGroupConfigs.SHARE_GROUP_MAX_SIZE_CONFIG)
val shareGroupSessionTimeoutMs = getInt(ShareGroupConfigs.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG)
val shareGroupMinSessionTimeoutMs = getInt(ShareGroupConfigs.SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG)
val shareGroupMaxSessionTimeoutMs = getInt(ShareGroupConfigs.SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG)
val shareGroupHeartbeatIntervalMs = getInt(ShareGroupConfigs.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG)
val shareGroupMinHeartbeatIntervalMs = getInt(ShareGroupConfigs.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG)
val shareGroupMaxHeartbeatIntervalMs = getInt(ShareGroupConfigs.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG)
val shareGroupRecordLockDurationMs = getInt(ShareGroupConfigs.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG)
val shareGroupMaxRecordLockDurationMs = getInt(ShareGroupConfigs.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG)
val shareGroupMinRecordLockDurationMs = getInt(ShareGroupConfigs.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG)

/** ********* Transaction management configuration ***********/
val transactionalIdExpirationMs = getInt(TransactionStateManagerConfigs.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG)
val transactionMaxTimeoutMs = getInt(TransactionStateManagerConfigs.TRANSACTIONS_MAX_TIMEOUT_MS_CONFIG)
Expand Down Expand Up @@ -1070,32 +1058,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
require(classOf[KafkaPrincipalSerde].isAssignableFrom(principalBuilderClass),
s"${BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG} must implement KafkaPrincipalSerde")

require(shareGroupMaxHeartbeatIntervalMs >= shareGroupMinHeartbeatIntervalMs,
s"${ShareGroupConfigs.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG} must be greater than or equals " +
s"to ${ShareGroupConfigs.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG}")
require(shareGroupHeartbeatIntervalMs >= shareGroupMinHeartbeatIntervalMs,
s"${ShareGroupConfigs.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG} must be greater than or equals " +
s"to ${ShareGroupConfigs.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG}")
require(shareGroupHeartbeatIntervalMs <= shareGroupMaxHeartbeatIntervalMs,
s"${ShareGroupConfigs.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG} must be less than or equals " +
s"to ${ShareGroupConfigs.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG}")

require(shareGroupMaxSessionTimeoutMs >= shareGroupMinSessionTimeoutMs,
s"${ShareGroupConfigs.SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG} must be greater than or equals " +
s"to ${ShareGroupConfigs.SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG}")
require(shareGroupSessionTimeoutMs >= shareGroupMinSessionTimeoutMs,
s"${ShareGroupConfigs.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG} must be greater than or equals " +
s"to ${ShareGroupConfigs.SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG}")
require(shareGroupSessionTimeoutMs <= shareGroupMaxSessionTimeoutMs,
s"${ShareGroupConfigs.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG} must be less than or equals " +
s"to ${ShareGroupConfigs.SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG}")

require(shareGroupRecordLockDurationMs >= shareGroupMinRecordLockDurationMs,
s"${ShareGroupConfigs.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG} must be greater than or equals " +
s"to ${ShareGroupConfigs.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG}")
require(shareGroupMaxRecordLockDurationMs >= shareGroupRecordLockDurationMs,
s"${ShareGroupConfigs.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG} must be greater than or equals " +
s"to ${ShareGroupConfigs.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG}")

if (originals.containsKey(GroupCoordinatorConfig.OFFSET_COMMIT_REQUIRED_ACKS_CONFIG)) {
warn(s"${GroupCoordinatorConfig.OFFSET_COMMIT_REQUIRED_ACKS_CONFIG} is deprecated and it will be removed in Apache Kafka 4.0.")
Expand Down
66 changes: 33 additions & 33 deletions core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.security.PasswordEncoderConfigs
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.{IBP_0_8_2, IBP_3_0_IV1}
import org.apache.kafka.server.config.{DelegationTokenManagerConfigs, KRaftConfigs, QuotaConfigs, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms, ShareGroupConfigs, ZkConfigs}
import org.apache.kafka.server.config.{DelegationTokenManagerConfigs, KRaftConfigs, QuotaConfigs, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms, ShareGroupConfig, ZkConfigs}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.metrics.MetricConfigs
import org.apache.kafka.storage.internals.log.CleanerConfig
Expand Down Expand Up @@ -1128,19 +1128,19 @@ class KafkaConfigTest {
case GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG => // ignore string

/** Share groups configs */
case ShareGroupConfigs.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case ShareGroupConfigs.SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case ShareGroupConfigs.SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case ShareGroupConfigs.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case ShareGroupConfigs.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case ShareGroupConfigs.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case ShareGroupConfigs.SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case ShareGroupConfigs.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case ShareGroupConfigs.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case ShareGroupConfigs.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case ShareGroupConfigs.SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case ShareGroupConfigs.SHARE_GROUP_MAX_GROUPS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case ShareGroupConfigs.SHARE_GROUP_MAX_SIZE_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case ShareGroupConfig.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case ShareGroupConfig.SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case ShareGroupConfig.SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case ShareGroupConfig.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case ShareGroupConfig.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case ShareGroupConfig.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case ShareGroupConfig.SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case ShareGroupConfig.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case ShareGroupConfig.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case ShareGroupConfig.SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case ShareGroupConfig.SHARE_GROUP_MAX_GROUPS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case ShareGroupConfig.SHARE_GROUP_MAX_SIZE_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)


case _ => assertPropertyInvalid(baseProperties, name, "not_a_number", "-1")
Expand Down Expand Up @@ -1997,16 +1997,16 @@ class KafkaConfigTest {
props.putAll(kraftProps())

// Max should be greater than or equals to min.
props.put(ShareGroupConfigs.SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, "20")
props.put(ShareGroupConfigs.SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, "10")
props.put(ShareGroupConfig.SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, "20")
props.put(ShareGroupConfig.SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, "10")
assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props))

// The timeout should be within the min-max range.
props.put(ShareGroupConfigs.SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, "10")
props.put(ShareGroupConfigs.SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, "20")
props.put(ShareGroupConfigs.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG, "5")
props.put(ShareGroupConfig.SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, "10")
props.put(ShareGroupConfig.SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, "20")
props.put(ShareGroupConfig.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG, "5")
assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props))
props.put(ShareGroupConfigs.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG, "25")
props.put(ShareGroupConfig.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG, "25")
assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props))
}

Expand All @@ -2016,16 +2016,16 @@ class KafkaConfigTest {
props.putAll(kraftProps())

// Max should be greater than or equals to min.
props.put(ShareGroupConfigs.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, "20")
props.put(ShareGroupConfigs.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, "10")
props.put(ShareGroupConfig.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, "20")
props.put(ShareGroupConfig.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, "10")
assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props))

// The timeout should be within the min-max range.
props.put(ShareGroupConfigs.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, "10")
props.put(ShareGroupConfigs.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, "20")
props.put(ShareGroupConfigs.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, "5")
props.put(ShareGroupConfig.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, "10")
props.put(ShareGroupConfig.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, "20")
props.put(ShareGroupConfig.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, "5")
assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props))
props.put(ShareGroupConfigs.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, "25")
props.put(ShareGroupConfig.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, "25")
assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props))
}

Expand All @@ -2034,19 +2034,19 @@ class KafkaConfigTest {
val props = new Properties()
props.putAll(kraftProps())

props.put(ShareGroupConfigs.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG, "10")
props.put(ShareGroupConfig.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG, "10")
assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props))
props.put(ShareGroupConfigs.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG, "10000000")
props.put(ShareGroupConfig.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG, "10000000")
assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props))

// The duration should be within the min-max range.
props.put(ShareGroupConfigs.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG, "1000")
props.put(ShareGroupConfigs.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG, "3600000")
props.put(ShareGroupConfigs.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG, "999")
props.put(ShareGroupConfig.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG, "1000")
props.put(ShareGroupConfig.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG, "3600000")
props.put(ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG, "999")
assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props))
props.put(ShareGroupConfigs.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG, "3600001")
props.put(ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG, "3600001")
assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props))
props.put(ShareGroupConfigs.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG, "30000")
props.put(ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG, "30000")
assertDoesNotThrow(() => KafkaConfig.fromProps(props))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public abstract class AbstractKafkaConfig extends AbstractConfig {
GroupCoordinatorConfig.CONSUMER_GROUP_CONFIG_DEF,
CleanerConfig.CONFIG_DEF,
LogConfig.SERVER_CONFIG_DEF,
ShareGroupConfigs.CONFIG_DEF,
ShareGroupConfig.CONFIG_DEF,
TransactionLogConfigs.CONFIG_DEF,
TransactionStateManagerConfigs.CONFIG_DEF,
QuorumConfig.CONFIG_DEF,
Expand Down
Loading