KAFKA-17081 Tweak GroupCoordinatorConfig: re-introduce local attributes and validation#16524
KAFKA-17081 Tweak GroupCoordinatorConfig: re-introduce local attributes and validation#16524chia7712 merged 5 commits intoapache:trunkfrom
Conversation
…es and validation
chia7712
left a comment
There was a problem hiding this comment.
@brandboat thanks for this patch.
| import static org.apache.kafka.common.utils.Utils.require; | ||
|
|
||
| /** | ||
| * The group coordinator configurations. |
There was a problem hiding this comment.
please add comments why we create local attributes
| this.consumerGroupSessionTimeoutMs = config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG); | ||
| this.consumerGroupHeartbeatIntervalMs = config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG); | ||
| this.consumerGroupMaxSize = config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SIZE_CONFIG); | ||
| this.consumerGroupAssignors = config.getConfiguredInstances(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, ConsumerGroupPartitionAssignor.class); |
There was a problem hiding this comment.
could you add test to make sure incorrect value can cause error since CONSUMER_GROUP_ASSIGNORS_CONFIG` has no validator?
There was a problem hiding this comment.
@brandboat Have you added test for CONSUMER_GROUP_ASSIGNORS_CONFIG?
There was a problem hiding this comment.
For another, could you make consumerGroupAssignors immutable?
There was a problem hiding this comment.
@brandboat Have you added test for CONSUMER_GROUP_ASSIGNORS_CONFIG?
I think It's already there before I submit this pr.
For another, could you make consumerGroupAssignors immutable?
Will do
| this.offsetCommitTimeoutMs = config.getInt(GroupCoordinatorConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG); | ||
| this.consumerGroupMigrationPolicy = ConsumerGroupMigrationPolicy.parse( | ||
| config.getString(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG)); | ||
| this.offsetTopicCompressionType = Optional.ofNullable(config.getInt(GroupCoordinatorConfig.OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG)) |
There was a problem hiding this comment.
ditto. please add test. Also, maybe we should add validator to the def of OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG
| this.offsetsRetentionCheckIntervalMs = config.getLong(GroupCoordinatorConfig.OFFSETS_RETENTION_CHECK_INTERVAL_MS_CONFIG); | ||
| this.offsetsRetentionMs = config.getInt(GroupCoordinatorConfig.OFFSETS_RETENTION_MINUTES_CONFIG) * 60L * 1000L; | ||
| this.offsetCommitTimeoutMs = config.getInt(GroupCoordinatorConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG); | ||
| this.consumerGroupMigrationPolicy = ConsumerGroupMigrationPolicy.parse( |
There was a problem hiding this comment.
Thanks ! I fixed all of them.
There was a problem hiding this comment.
Please add UT to ensure the incorrect value can cause in contructor.
There was a problem hiding this comment.
All comments are addressed in the latest commit, thanks
chia7712
left a comment
There was a problem hiding this comment.
@brandboat thanks for updated patch
| this.consumerGroupSessionTimeoutMs = config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG); | ||
| this.consumerGroupHeartbeatIntervalMs = config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG); | ||
| this.consumerGroupMaxSize = config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SIZE_CONFIG); | ||
| this.consumerGroupAssignors = config.getConfiguredInstances(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, ConsumerGroupPartitionAssignor.class); |
There was a problem hiding this comment.
@brandboat Have you added test for CONSUMER_GROUP_ASSIGNORS_CONFIG?
| this.consumerGroupSessionTimeoutMs = config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG); | ||
| this.consumerGroupHeartbeatIntervalMs = config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG); | ||
| this.consumerGroupMaxSize = config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SIZE_CONFIG); | ||
| this.consumerGroupAssignors = config.getConfiguredInstances(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, ConsumerGroupPartitionAssignor.class); |
There was a problem hiding this comment.
For another, could you make consumerGroupAssignors immutable?
| this.offsetsRetentionCheckIntervalMs = config.getLong(GroupCoordinatorConfig.OFFSETS_RETENTION_CHECK_INTERVAL_MS_CONFIG); | ||
| this.offsetsRetentionMs = config.getInt(GroupCoordinatorConfig.OFFSETS_RETENTION_MINUTES_CONFIG) * 60L * 1000L; | ||
| this.offsetCommitTimeoutMs = config.getInt(GroupCoordinatorConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG); | ||
| this.consumerGroupMigrationPolicy = ConsumerGroupMigrationPolicy.parse( |
There was a problem hiding this comment.
Please add UT to ensure the incorrect value can cause in contructor.
…es and validation (apache#16524) Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
related to https://issues.apache.org/jira/browse/KAFKA-17081
address comments in #16458 (comment)
Committer Checklist (excluded from commit message)