Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1656,6 +1656,11 @@ public static void require(boolean requirement) {
throw new IllegalArgumentException("requirement failed");
}

public static void require(boolean requirement, String errorMessage) {
if (!requirement)
throw new IllegalArgumentException(errorMessage);
}

/**
* Merge multiple {@link ConfigDef} into one
* @param configDefs List of {@link ConfigDef}
Expand Down
24 changes: 1 addition & 23 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -890,8 +890,7 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
" to prevent unnecessary socket timeouts")
require(replicaFetchWaitMaxMs <= replicaLagTimeMaxMs, "replica.fetch.wait.max.ms should always be less than or equal to replica.lag.time.max.ms" +
" to prevent frequent changes in ISR")
require(groupCoordinatorConfig.offsetCommitRequiredAcks >= -1 && groupCoordinatorConfig.offsetCommitRequiredAcks <= groupCoordinatorConfig.offsetsTopicReplicationFactor,
"offsets.commit.required.acks must be greater or equal -1 and less or equal to offsets.topic.replication.factor")

val advertisedBrokerListenerNames = effectiveAdvertisedBrokerListeners.map(_.listenerName).toSet

// validate KRaft-related configs
Expand Down Expand Up @@ -1075,27 +1074,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
require(classOf[KafkaPrincipalSerde].isAssignableFrom(principalBuilderClass),
s"${BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG} must implement KafkaPrincipalSerde")

// New group coordinator configs validation.
require(groupCoordinatorConfig.consumerGroupMaxHeartbeatIntervalMs >= groupCoordinatorConfig.consumerGroupMinHeartbeatIntervalMs,
s"${GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG} must be greater than or equals " +
s"to ${GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG}")
require(groupCoordinatorConfig.consumerGroupHeartbeatIntervalMs >= groupCoordinatorConfig.consumerGroupMinHeartbeatIntervalMs,
s"${GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG} must be greater than or equals " +
s"to ${GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG}")
require(groupCoordinatorConfig.consumerGroupHeartbeatIntervalMs <= groupCoordinatorConfig.consumerGroupMaxHeartbeatIntervalMs,
s"${GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG} must be less than or equals " +
s"to ${GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG}")

require(groupCoordinatorConfig.consumerGroupMaxSessionTimeoutMs >= groupCoordinatorConfig.consumerGroupMinSessionTimeoutMs,
s"${GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG} must be greater than or equals " +
s"to ${GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG}")
require(groupCoordinatorConfig.consumerGroupSessionTimeoutMs >= groupCoordinatorConfig.consumerGroupMinSessionTimeoutMs,
s"${GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG} must be greater than or equals " +
s"to ${GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG}")
require(groupCoordinatorConfig.consumerGroupSessionTimeoutMs <= groupCoordinatorConfig.consumerGroupMaxSessionTimeoutMs,
s"${GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG} must be less than or equals " +
s"to ${GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG}")

require(shareGroupMaxHeartbeatIntervalMs >= shareGroupMinHeartbeatIntervalMs,
s"${ShareGroupConfigs.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG} must be greater than or equals " +
s"to ${ShareGroupConfigs.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,13 @@
import static org.apache.kafka.common.config.ConfigDef.Type.LONG;
import static org.apache.kafka.common.config.ConfigDef.Type.SHORT;
import static org.apache.kafka.common.config.ConfigDef.Type.STRING;
import static org.apache.kafka.common.utils.Utils.require;

/**
* The group coordinator configurations.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add comments why we create local attributes

* This configuration utilizes several local variables instead of calling AbstractConfig#get.... as all configs here
* are static and non-dynamic, with some being accessed extremely frequently (e.g., offsets.commit.timeout.ms).
* Using local variable is advantageous as it avoids the overhead of repeatedly looking up these configurations in AbstractConfig.
*/
public class GroupCoordinatorConfig {
/** ********* Group coordinator configuration ***********/
Expand Down Expand Up @@ -213,82 +217,152 @@ public class GroupCoordinatorConfig {
*/
public static final int CLASSIC_GROUP_NEW_MEMBER_JOIN_TIMEOUT_MS = 5 * 60 * 1000;

private final AbstractConfig config;
private final int numThreads;
private final int appendLingerMs;
private final int consumerGroupSessionTimeoutMs;
private final int consumerGroupHeartbeatIntervalMs;
private final int consumerGroupMaxSize;
private final List<ConsumerGroupPartitionAssignor> consumerGroupAssignors;
private final int offsetsTopicSegmentBytes;
private final int offsetMetadataMaxSize;
private final int classicGroupMaxSize;
private final int classicGroupInitialRebalanceDelayMs;
private final int classicGroupMinSessionTimeoutMs;
private final int classicGroupMaxSessionTimeoutMs;
private final long offsetsRetentionCheckIntervalMs;
private final long offsetsRetentionMs;
private final int offsetCommitTimeoutMs;
private final ConsumerGroupMigrationPolicy consumerGroupMigrationPolicy;
private final CompressionType offsetTopicCompressionType;
private final int offsetsLoadBufferSize;
private final int offsetsTopicPartitions;
private final short offsetsTopicReplicationFactor;
private final short offsetCommitRequiredAcks;
private final int consumerGroupMinSessionTimeoutMs;
private final int consumerGroupMaxSessionTimeoutMs;
private final int consumerGroupMinHeartbeatIntervalMs;
private final int consumerGroupMaxHeartbeatIntervalMs;

public GroupCoordinatorConfig(AbstractConfig config) {
this.config = config;
this.numThreads = config.getInt(GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG);
this.appendLingerMs = config.getInt(GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG);
this.consumerGroupSessionTimeoutMs = config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG);
this.consumerGroupHeartbeatIntervalMs = config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG);
this.consumerGroupMaxSize = config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SIZE_CONFIG);
this.consumerGroupAssignors = Collections.unmodifiableList(
config.getConfiguredInstances(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, ConsumerGroupPartitionAssignor.class));
this.offsetsTopicSegmentBytes = config.getInt(GroupCoordinatorConfig.OFFSETS_TOPIC_SEGMENT_BYTES_CONFIG);
this.offsetMetadataMaxSize = config.getInt(GroupCoordinatorConfig.OFFSET_METADATA_MAX_SIZE_CONFIG);
this.classicGroupMaxSize = config.getInt(GroupCoordinatorConfig.GROUP_MAX_SIZE_CONFIG);
this.classicGroupInitialRebalanceDelayMs = config.getInt(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG);
this.classicGroupMinSessionTimeoutMs = config.getInt(GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG);
this.classicGroupMaxSessionTimeoutMs = config.getInt(GroupCoordinatorConfig.GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG);
this.offsetsRetentionCheckIntervalMs = config.getLong(GroupCoordinatorConfig.OFFSETS_RETENTION_CHECK_INTERVAL_MS_CONFIG);
this.offsetsRetentionMs = config.getInt(GroupCoordinatorConfig.OFFSETS_RETENTION_MINUTES_CONFIG) * 60L * 1000L;
this.offsetCommitTimeoutMs = config.getInt(GroupCoordinatorConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG);
this.consumerGroupMigrationPolicy = ConsumerGroupMigrationPolicy.parse(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks ! I fixed all of them.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add UT to ensure the incorrect value can cause in contructor.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All comments are addressed in the latest commit, thanks

config.getString(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG));
this.offsetTopicCompressionType = Optional.ofNullable(config.getInt(GroupCoordinatorConfig.OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto. please add test. Also, maybe we should add validator to the def of OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG

.map(CompressionType::forId)
.orElse(null);
this.offsetsLoadBufferSize = config.getInt(GroupCoordinatorConfig.OFFSETS_LOAD_BUFFER_SIZE_CONFIG);
this.offsetsTopicPartitions = config.getInt(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG);
this.offsetsTopicReplicationFactor = config.getShort(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG);
this.offsetCommitRequiredAcks = config.getShort(GroupCoordinatorConfig.OFFSET_COMMIT_REQUIRED_ACKS_CONFIG);
this.consumerGroupMinSessionTimeoutMs = config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG);
this.consumerGroupMaxSessionTimeoutMs = config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG);
this.consumerGroupMinHeartbeatIntervalMs = config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG);
this.consumerGroupMaxHeartbeatIntervalMs = config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG);

require(offsetCommitRequiredAcks >= -1 && offsetCommitRequiredAcks <= offsetsTopicReplicationFactor,
String.format("%s must be greater or equal to -1 and less or equal to %s", OFFSET_COMMIT_REQUIRED_ACKS_CONFIG, OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG));

// New group coordinator configs validation.
require(consumerGroupMaxHeartbeatIntervalMs >= consumerGroupMinHeartbeatIntervalMs,
String.format("%s must be greater than or equals to %s", CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG));
require(consumerGroupHeartbeatIntervalMs >= consumerGroupMinHeartbeatIntervalMs,
String.format("%s must be greater than or equals to %s", CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG));
require(consumerGroupHeartbeatIntervalMs <= consumerGroupMaxHeartbeatIntervalMs,
String.format("%s must be less than or equals to %s", CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG));

require(consumerGroupMaxSessionTimeoutMs >= consumerGroupMinSessionTimeoutMs,
String.format("%s must be greater than or equals to %s", CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG));
require(consumerGroupSessionTimeoutMs >= consumerGroupMinSessionTimeoutMs,
String.format("%s must be greater than or equals to %s", CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG));
require(consumerGroupSessionTimeoutMs <= consumerGroupMaxSessionTimeoutMs,
String.format("%s must be less than or equals to %s", CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG));
}

/**
* The number of threads or event loops running.
*/
public int numThreads() {
return config.getInt(GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG);
return numThreads;
}

/**
* The duration in milliseconds that the coordinator will wait for writes to
* accumulate before flushing them to disk.
*/
public int appendLingerMs() {
return config.getInt(GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG);
return appendLingerMs;
}

/**
* The consumer group session timeout in milliseconds.
*/
public int consumerGroupSessionTimeoutMs() {
return config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG);
return consumerGroupSessionTimeoutMs;
}

/**
* The consumer group heartbeat interval in milliseconds.
*/
public int consumerGroupHeartbeatIntervalMs() {
return config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG);
return consumerGroupHeartbeatIntervalMs;
}

/**
* The consumer group maximum size.
*/
public int consumerGroupMaxSize() {
return config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SIZE_CONFIG);
return consumerGroupMaxSize;
}

/**
* The consumer group assignors.
*/
public List<ConsumerGroupPartitionAssignor> consumerGroupAssignors() {
return config.getConfiguredInstances(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, ConsumerGroupPartitionAssignor.class);
return consumerGroupAssignors;
}

/**
* The offsets topic segment bytes should be kept relatively small to facilitate faster
* log compaction and faster offset loads.
*/
public int offsetsTopicSegmentBytes() {
return config.getInt(GroupCoordinatorConfig.OFFSETS_TOPIC_SEGMENT_BYTES_CONFIG);
return offsetsTopicSegmentBytes;
}

/**
* The maximum size for a metadata entry associated with an offset commit.
*/
public int offsetMetadataMaxSize() {
return config.getInt(GroupCoordinatorConfig.OFFSET_METADATA_MAX_SIZE_CONFIG);
return offsetMetadataMaxSize;
}

/**
* The classic group maximum size.
*/
public int classicGroupMaxSize() {
return config.getInt(GroupCoordinatorConfig.GROUP_MAX_SIZE_CONFIG);
return classicGroupMaxSize;
}

/**
* The delay in milliseconds introduced for the first rebalance of a classic group.
*/
public int classicGroupInitialRebalanceDelayMs() {
return config.getInt(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG);
return classicGroupInitialRebalanceDelayMs;
}

/**
Expand All @@ -302,21 +376,21 @@ public int classicGroupNewMemberJoinTimeoutMs() {
* The classic group minimum session timeout.
*/
public int classicGroupMinSessionTimeoutMs() {
return config.getInt(GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG);
return classicGroupMinSessionTimeoutMs;
}

/**
* The classic group maximum session timeout.
*/
public int classicGroupMaxSessionTimeoutMs() {
return config.getInt(GroupCoordinatorConfig.GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG);
return classicGroupMaxSessionTimeoutMs;
}

/**
* Frequency at which to check for expired offsets.
*/
public long offsetsRetentionCheckIntervalMs() {
return config.getLong(GroupCoordinatorConfig.OFFSETS_RETENTION_CHECK_INTERVAL_MS_CONFIG);
return offsetsRetentionCheckIntervalMs;
}

/**
Expand All @@ -334,55 +408,52 @@ public long offsetsRetentionCheckIntervalMs() {
* committed offsets for that topic will also be deleted without extra retention period.
*/
public long offsetsRetentionMs() {
return config.getInt(GroupCoordinatorConfig.OFFSETS_RETENTION_MINUTES_CONFIG) * 60L * 1000L;
return offsetsRetentionMs;
}

/**
* Offset commit will be delayed until all replicas for the offsets topic receive the commit
* or this timeout is reached
*/
public int offsetCommitTimeoutMs() {
return config.getInt(GroupCoordinatorConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG);
return offsetCommitTimeoutMs;
}

/**
* The config indicating whether group protocol upgrade/downgrade are allowed.
*/
public ConsumerGroupMigrationPolicy consumerGroupMigrationPolicy() {
return ConsumerGroupMigrationPolicy.parse(
config.getString(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG));
return consumerGroupMigrationPolicy;
}

/**
* The compression type used to compress records in batches.
*/
public CompressionType offsetTopicCompressionType() {
return Optional.ofNullable(config.getInt(GroupCoordinatorConfig.OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG))
.map(CompressionType::forId)
.orElse(null);
return offsetTopicCompressionType;
}

/**
* Batch size for reading from the offsets segments when loading offsets into
* the cache (soft-limit, overridden if records are too large).
*/
public int offsetsLoadBufferSize() {
return config.getInt(GroupCoordinatorConfig.OFFSETS_LOAD_BUFFER_SIZE_CONFIG);
return offsetsLoadBufferSize;
}

/**
* The number of partitions for the offset commit topic (should not change after deployment).
*/
public int offsetsTopicPartitions() {
return config.getInt(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG);
return offsetsTopicPartitions;
}

/**
* The replication factor for the offsets topic (set higher to ensure availability).
* Internal topic creation will fail until the cluster size meets this replication factor requirement.
*/
public short offsetsTopicReplicationFactor() {
return config.getShort(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG);
return offsetsTopicReplicationFactor;
}

/**
Expand All @@ -391,34 +462,34 @@ public short offsetsTopicReplicationFactor() {
*/
@Deprecated // since 3.8
public short offsetCommitRequiredAcks() {
return config.getShort(GroupCoordinatorConfig.OFFSET_COMMIT_REQUIRED_ACKS_CONFIG);
return offsetCommitRequiredAcks;
}

/**
* The minimum allowed session timeout for registered consumers.
*/
public int consumerGroupMinSessionTimeoutMs() {
return config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG);
return consumerGroupMinSessionTimeoutMs;
}

/**
* The maximum allowed session timeout for registered consumers.
*/
public int consumerGroupMaxSessionTimeoutMs() {
return config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG);
return consumerGroupMaxSessionTimeoutMs;
}

/**
* The minimum heartbeat interval for registered consumers.
*/
public int consumerGroupMinHeartbeatIntervalMs() {
return config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG);
return consumerGroupMinHeartbeatIntervalMs;
}

/**
* The maximum heartbeat interval for registered consumers.
*/
public int consumerGroupMaxHeartbeatIntervalMs() {
return config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG);
return consumerGroupMaxHeartbeatIntervalMs;
}
}
Loading