KIP-222 - Add Consumer Group operations to Admin API#4454
KIP-222 - Add Consumer Group operations to Admin API#4454jeqo wants to merge 70 commits intoapache:trunkfrom
Conversation
|
I did not take a deep look yet, but there are some problems that will probably require changes to the KIP. The group management protocol handles more than just consumer groups. Connect also uses it for its own group management. Connect groups do not have topic partition assignments, so the current
Additionally, I think there are a few other points to consider:
|
|
Just to clarify, if we go with the first option that I suggested above, we probably don't need a revote on the KIP. I doubt anyone will object if you just send an email to the discussion thread with the changes. Specifically, we should do the following:
Otherwise, the logic stays the same. We would just need to filter groups which do not have the "consumer" protocol type. Note that I do think it's important to account for simple consumer groups as well. These can be identified as groups with an empty protocol type. |
|
@hachikuji thanks for your feedback! Putting Group Management (connect and consumer groups) together as part of the changes to the API was one of the important points discussed in this KIP. As I understand so far: connect group details are different from consumer group details, so we will need to define different APIs for both instead of trying to make them generic as your second option propose. Following what your first option proposed we can add:
Simple Consumer Groups will be added to And about adding DeleteGroups API I would be happy to, but I think we would need to hold until #4479 is merged right? |
|
@jeqo I'd probably suggest sticking with consumer groups initially if we want to have any hope of getting this into the release. So if we have 1-3 in your list, we can leave general group APIs for future work. The nice thing about sticking with consumer groups is that it lets us expose the inner details from a consumer perspective. Here a couple concrete suggestions:
As far as DeleteGroups, we should probably only consider it if we manage to get 1-3 merged. Thoughts? |
|
@hachikuji I have applied changes mentioned above. Let me know WDYT, and if I should take this back to the KIP discussion to gather some feedback. I could propose to move Connect Group Management and Delete Consumer Group to a next KIP. |
hachikuji
left a comment
There was a problem hiding this comment.
@jeqo Thanks for the updates. From an API perspective, this is looking good. However, the request logic needs a bit more work. I would suggest having a look at kafka.admin.AdminClient to understand the logic behind these requests.
I don't think it's reasonable to get this patch in the next day, so let's plan on getting this into the next feature release. With a little more time, we can also consider the delete API and non-consumer groups (potentially in a separate KIP).
| private final String groupId; | ||
| private final boolean isSimpleConsumerGroup; | ||
| private final List<MemberDescription> members; | ||
| private final String protocol; |
There was a problem hiding this comment.
Maybe we can use the consumer-specific term "partitionAssignor"?
| } | ||
| final long now = time.milliseconds(); | ||
| runnable.call(new Call("describeConsumerGroups", calcDeadlineMs(now, options.timeoutMs()), | ||
| new ControllerNodeProvider()) { |
There was a problem hiding this comment.
The DescribeGroup API has to be sent to the group coordinator, which is potentially a different node for each group. You use the FindCoordinator API in order to lookup the coordinator for a given group. The logic should be something like this:
- For each group in the request, send a FindCoordinator request to any node in the cluster.
- Group the results by coordinator id.
- Send DescribeGroups to each coordinator from 2.
Ideally, we should also handle retries correctly. It could happen that the coordinator moves to another node by the time we send DescribeGroups. In this case, the error code will be NOT_COORDINATOR. We should handle this by looking up the coordinator again.
There was a problem hiding this comment.
OK, I have pushed draft following this description.
| } | ||
|
|
||
| @Override | ||
| public ListConsumerGroupsResult listConsumerGroups(ListConsumerGroupsOptions options) { |
There was a problem hiding this comment.
Unfortunately, the consumer groups are not aggregated in the same way that topic metadata is. To get all the groups in the cluster, you have to send the ListGroups request to all nodes.
There was a problem hiding this comment.
I assume this is the same as DescribeConsumerGroup API. If changes are fine, I will updated this an listConsumerGroupOffsets
There was a problem hiding this comment.
The logic is different for ListGroups. We have to send a separate request to every broker in the cluster and then aggregate the results.
There was a problem hiding this comment.
@hachikuji trying to test this I find out that the result from this operation has to change (similar to how the Scala API is doing now) to respond with Map<Node, KafkaFuture<List<ConsumerGroupListing>>> instead, to list all responses from nodes. Does this make sense? I can update the KIP regarding this.
There was a problem hiding this comment.
I'd vote for doing the flattening internally than expose the node information in the results, as this is supposed to be internal implementation details that is better not leaking out. The Scala API returning Map[Node, List[GroupOverview]] was not a well designed one in my hind-sight.
| } | ||
|
|
||
| @Override | ||
| public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(final String groupId, final ListConsumerGroupOffsetsOptions options) { |
There was a problem hiding this comment.
Like DescribeGroups, we need to find the coordinator for the group to send the OffsetFetch request to.
|
Refer to this link for build results (access rights to CI server needed): |
hachikuji
left a comment
There was a problem hiding this comment.
Apologies for the review delay. I will look in more depth once the 1.1 release has been finalized. In the meantime, I noticed there were no test cases. It would be a good idea to add both unit and integration tests for the new functionality.
Just to clarify, we will not expose the map inside the future to user; the map will be maintained internally only during the second round-trip as we need to check if we have received the responses from all found broker nodes, but as I suggested we should flatten it before exposing to the users. So the returned type of |
…uests (apache#4842) This patch fixes an edge case in producer shutdown which prevents `close()` from completing due to a pending request which will never be sent due to shutdown initiation. I have added a test case which reproduces the scenario. Reviewers: Apurva Mehta <apurva@confluent.io>, Ismael Juma <ismael@juma.me.uk>
Added consumer only workload to Trogdor. The topics must already be pre-populated. The spec lets the user request topic pattern and range of partitions to assign to [startPartition, endPartition]. Reviewers: Colin P. Mccabe <cmccabe@confluent.io>, Rajini Sivaram <rajinisivaram@googlemail.com>
…e#4845) The toString() for ConfigResource was using { } instead of ( ) which is inconsistent with the existing toStrings in the code, while toString for Resource was using a mix of ( and }.
guozhangwang
left a comment
There was a problem hiding this comment.
@jeqo I left some more comments on the return types of other Result classes.
Also the jenkins failures seem consistent, need to fix them before merging as well.
| this.futures = futures; | ||
| } | ||
|
|
||
| public KafkaFuture<Map<String, KafkaFuture<Void>>> values() { |
There was a problem hiding this comment.
I think we can just have one function between values and groups here. I'd suggest we use
public Map<TopicPartition, KafkaFuture<Void>> deletedGroups()
| return futures; | ||
| } | ||
|
|
||
| public KafkaFuture<Collection<String>> names() { |
There was a problem hiding this comment.
Where is this function used?
I'd suggest we only keep one function, i.e.
public Map<TopicPartition, KafkaFuture< ConsumerGroupDescription >> DescribeConsumerGroupsResult#values()
| }); | ||
| } | ||
|
|
||
| public KafkaFuture<Collection<KafkaFuture<Void>>> all() { |
There was a problem hiding this comment.
For all() function, its returned type should be KafkaFuture<Void>; ditto for other two Results as well.
| /** | ||
| * Return a future which yields a map of topic partitions to OffsetAndMetadata objects. | ||
| */ | ||
| public KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> partitionsToOffsetAndMetadata() { |
There was a problem hiding this comment.
I'd suggest only keep partitionsToOffsetAndMetadata here.
…che#4848) Move creation of quota callback instance out of KafkaConfig constructor to QuotaFactory.instantiate to avoid creating a callback instance for every KafkaConfig since we create temporary KafkaConfigs during dynamic config updates. Reviewers: Jun Rao <junrao@gmail.com>
|
retest this please |
Reviewers: Jun Rao <junrao@gmail.com>
…249) (apache#4427) Reviewers: Jun Rao <junrao@gmail.com>
|
@jeqo could you rebase your PR? |
NVM, I've done the rebase, and will merge i once the tests passed locally. |
|
Unit test passed locally, merging now |
|
@guozhangwang thanks!! :) |
KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-222+-+Add+Consumer+Group+operations+to+Admin+API Author: Jorge Quilcate Otoya <quilcate.jorge@gmail.com> Author: Jorge Esteban Quilcate Otoya <quilcate.jorge@gmail.com> Author: Guozhang Wang <wangguoz@gmail.com> Reviewers: Colin Patrick McCabe <colin@cmccabe.xyz>, Guozhang Wang <wangguoz@gmail.com> Closes apache#4454 from jeqo/feature/admin-client-describe-consumer-group
KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-222+-+Add+Consumer+Group+operations+to+Admin+API Author: Jorge Quilcate Otoya <quilcate.jorge@gmail.com> Author: Jorge Esteban Quilcate Otoya <quilcate.jorge@gmail.com> Author: Guozhang Wang <wangguoz@gmail.com> Reviewers: Colin Patrick McCabe <colin@cmccabe.xyz>, Guozhang Wang <wangguoz@gmail.com> Closes apache#4454 from jeqo/feature/admin-client-describe-consumer-group
KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-222+-+Add+Consumer+Group+operations+to+Admin+API Author: Jorge Quilcate Otoya <quilcate.jorge@gmail.com> Author: Jorge Esteban Quilcate Otoya <quilcate.jorge@gmail.com> Author: Guozhang Wang <wangguoz@gmail.com> Reviewers: Colin Patrick McCabe <colin@cmccabe.xyz>, Guozhang Wang <wangguoz@gmail.com> Closes apache#4454 from jeqo/feature/admin-client-describe-consumer-group
KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-222+-+Add+Consumer+Group+operations+to+Admin+API Author: Jorge Quilcate Otoya <quilcate.jorge@gmail.com> Author: Jorge Esteban Quilcate Otoya <quilcate.jorge@gmail.com> Author: Guozhang Wang <wangguoz@gmail.com> Reviewers: Colin Patrick McCabe <colin@cmccabe.xyz>, Guozhang Wang <wangguoz@gmail.com> Closes apache#4454 from jeqo/feature/admin-client-describe-consumer-group
KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-222+-+Add+Consumer+Group+operations+to+Admin+API