KAFKA-14511: extend AlterIncrementalConfigs API to support group config#15067
KAFKA-14511: extend AlterIncrementalConfigs API to support group config#15067dajac merged 22 commits intoapache:trunkfrom
Conversation
AndrewJSchofield
left a comment
There was a problem hiding this comment.
KIP-848 introduces the new GROUP resource type with the intent that it will be used for more than just consumer groups in the future.
# Conflicts: # core/src/main/scala/kafka/zk/AdminZkClient.scala # core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
|
@AndrewJSchofield, I've updated the PR. Please take a look again. Thanks. |
AndrewJSchofield
left a comment
There was a problem hiding this comment.
This PR is looking pretty good now. Thanks.
# Conflicts: # core/src/main/scala/kafka/server/KafkaConfig.scala
|
Thanks @AndrewJSchofield for the feedback, I have addressed comments. |
|
@dajac, PTAL, thanks in advance. |
# Conflicts: # core/src/main/scala/kafka/server/BrokerServer.scala # core/src/main/scala/kafka/server/KafkaConfig.scala # group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java # group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
|
This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. |
|
@DL1231 I'm interested in getting group configs working. This PR is quite out of date, in particular because there's been a lot of refactoring of configs in Kafka recently. Would you like to rebase it and get it working again? Alternatively, I'm happy to take the work on instead. |
# Conflicts: # core/src/main/scala/kafka/security/authorizer/AclEntry.scala # core/src/main/scala/kafka/server/BrokerServer.scala # core/src/main/scala/kafka/server/ConfigHandler.scala # core/src/main/scala/kafka/server/KafkaConfig.scala # core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala # core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala # group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java # group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java # group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java # group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java # group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java
|
@AndrewJSchofield I've resolved the conflict, PTAL, thanks in advance. |
0657ace to
ab95c05
Compare
|
@DL1231 Yes, I'll give the updated code a detailed review in the next few days. Thanks for rebasing it. |
AndrewJSchofield
left a comment
There was a problem hiding this comment.
Thanks for the updated PR. I left a few comments.
# Conflicts: # core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
|
@AndrewJSchofield I've updated the PR. Please take a look again. Thanks. |
AndrewJSchofield
left a comment
There was a problem hiding this comment.
lgtm. Thanks for the PR.
|
|
||
| @ParameterizedTest | ||
| @ValueSource(strings = Array("kraft+kip848")) | ||
| def testIncrementalAlterDefaultGroupConfig(quorum: String): Unit = { |
There was a problem hiding this comment.
nit: testDynamicGroupConfigChangeWithInvalidValue?
There was a problem hiding this comment.
Consider aligning with testIncrementalAlterDefaultTopicConfig
| val resource = new ConfigResource(ConfigResource.Type.GROUP, "") | ||
| val op = new AlterConfigOp(new ConfigEntry(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, "200000"), OpType.SET) | ||
| val future = admin.incrementalAlterConfigs(Map(resource -> List(op).asJavaCollection).asJava).all | ||
| TestUtils.assertFutureExceptionTypeEquals(future, classOf[InvalidRequestException]) |
There was a problem hiding this comment.
I wonder if it should rather be a InvalidConfigurationException exception. What's the reasoning for using InvalidRequestException?
There was a problem hiding this comment.
Consider aligning with testIncrementalAlterDefaultTopicConfig
| super(CONFIG, props, false); | ||
| } | ||
|
|
||
| public static Set<String> configNames() { |
There was a problem hiding this comment.
configNames is only used in validateNames. Should we remove it?
There was a problem hiding this comment.
It is also used in testFromPropsInvalid.
# Conflicts: # core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
|
I'll be away until 7/29. I will continue reviewing this PR when I come back. |
|
@dajac Sorry for the delay, I've updated the PR, PTAL when you get a chance. |
# Conflicts: # group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java # group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
|
@DL1231 I am back. Thanks for the update. I will review your PR asap. |
| CLIENT_METRICS((byte) 16), | ||
| BROKER_LOGGER((byte) 8), | ||
| BROKER((byte) 4), | ||
| GROUP((byte) 3), |
There was a problem hiding this comment.
I disagree with this change. I think that GROUP should use 32 in order to stay consistent with the existing values.
| } | ||
|
|
||
| /** | ||
| * Create a group config instance using the given properties and defaults |
There was a problem hiding this comment.
nit: Let's add a . at the end of the sentence for consistency.
| /** | ||
| * Copy the subset of properties that are relevant to consumer group. | ||
| */ | ||
| def extractGroupConfigMap: java.util.Map[String, Int] = { | ||
| val groupProps = new java.util.HashMap[String, Int]() | ||
| groupProps.put(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, groupCoordinatorConfig.consumerGroupSessionTimeoutMs) | ||
| groupProps.put(GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, groupCoordinatorConfig.consumerGroupHeartbeatIntervalMs) | ||
| groupProps | ||
| } |
There was a problem hiding this comment.
Would it make sense to move this method to GroupCoordinatorConfig? GroupCoordinatorConfig has changed quite a lot since this PR was started. It seems to me that keeping all the group related logic there would make sense. What do you think?
If we do this, we could also simplify GroupConfigManager.validate.
There was a problem hiding this comment.
Yes it makes sense to me. Moved this method to GroupCoordinatorConfig.
# Conflicts: # core/src/main/scala/kafka/server/BrokerServer.scala # core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala
|
Hi @dajac. Thanks a lot for the review. I have made the required changes in the last commit, PTAL when you get a chance. |
767e756 to
10b9d24
Compare
|
I spoke too quickly. @DL1231 There are related failed tests. For instance:
|
|
@DL1231 The last build failed with compilation errors. Could you please check? I would also advice to not rebase and force-push from now on. It will be easier for me to follow the fixes. Otherwise, I have to go through the entire PR. |
This PR add resources to store and handle consumer group config. jira
Changes include:
Committer Checklist (excluded from commit message)