Skip to content

KAFKA-6791: Add CoordinatorNodeProvider in KafkaAdminClient#4902

Closed
huxihx wants to merge 5 commits intoapache:trunkfrom
huxihx:KAFKA-6791
Closed

KAFKA-6791: Add CoordinatorNodeProvider in KafkaAdminClient#4902
huxihx wants to merge 5 commits intoapache:trunkfrom
huxihx:KAFKA-6791

Conversation

@huxihx
Copy link
Copy Markdown
Contributor

@huxihx huxihx commented Apr 20, 2018

KAFKA-6791: Add CoordinatorNodeProvider in KafkaAdminClient
https://issues.apache.org/jira/browse/KAFKA-6791

Add CoordinatorNodeProvider interface to support batch retrieval for group coordinators.

More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.

Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

huxi-2b added 2 commits April 20, 2018 11:28
https://issues.apache.org/jira/browse/KAFKA-6791

Add `CoordinatorNodeProvider` interface and its implementor `ConsumerGroupCoordinatorNodeProvider` to support batch retrieval for the group coordinators.
@huxihx
Copy link
Copy Markdown
Contributor Author

huxihx commented Apr 20, 2018

retest it please

@huxihx
Copy link
Copy Markdown
Contributor Author

huxihx commented Apr 20, 2018

@guozhangwang Please kindly review. Thanks

@guozhangwang guozhangwang requested a review from hachikuji April 20, 2018 15:55
@guozhangwang
Copy link
Copy Markdown
Contributor

cc @hachikuji @cmccabe for reviews as well.

int nodeId = coordinator == null ? -1 : coordinator.id(); // leave null-handling to the next NodeProvider

runnable.call(new Call("findCoordinator", deadline, new LeastLoadedNodeProvider()) {
runnable.call(new Call("describeConsumerGroups", deadline, new ConstantNodeIdProvider(nodeId)) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ideal would be to use the CoordinatorNodeProvider here. There is not much benefit in having it if we just invoke it inline. The problem is that the provide() method is called by the send thread, so we cannot have it block on an operation which itself depends on the send thread. To make it work nicely in this way, we probably need an asynchronous NodeProvider API which effectively lets us chain the DescribeGroup request on to its completion. For example, maybe something like this could work:

interface AsyncNodeProvider {
  KafkaFuture<Node> provide();
}

cc @cmccabe (who may have some ideas as well)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hachikuji Correct me if I am wrong. Coordinator-finding should be always finished before doing group-related tasks, no matter async or sync interface we use, so it means we have to wait for it in any cases. What we should do is to ensure the blocking time is not unlimited.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hachikuji is correct. We can't do blocking operations in the admin client service thread. We certainly can't do blocking operations that wait for the service thread itself. This will deadlock.

I think it's a good idea to have a coordinator node provider, but we need to build out a little more infrastructure to make it possible. I have a change which should help with that, at #4295

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hachikuji Do you think it's okay to convert this async operation into a sync one, similar with this above:

TopicPartition tp = new TopicPartition("__consumer_offsets", Math.abs(groupID.hashCode() % 50));
return metadata.fetch().leaderFor(tp);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's an interesting thought, but users may override the number of partitions for __consumer_offsets, so I don't think it will work. More generally, we are trying to avoid dependence in the clients on the __consumer_offsets topic since it ties the behavior of the client to what is more properly an implementation detail.

@huxihx
Copy link
Copy Markdown
Contributor Author

huxihx commented May 2, 2018

@hachikuji Please review again. Thanks.

@huxihx
Copy link
Copy Markdown
Contributor Author

huxihx commented May 6, 2018

retest it please

@cmccabe
Copy link
Copy Markdown
Contributor

cmccabe commented May 15, 2018

@huxihx: thanks for the PR. I don't think this is needed any more, though, now that we merged KAFKA-6299.

@huxihx
Copy link
Copy Markdown
Contributor Author

huxihx commented May 22, 2018

@cmccabe Sorry for the late response. Okay, will cancel this PR soon.

@huxihx
Copy link
Copy Markdown
Contributor Author

huxihx commented May 22, 2018

Closed this PR since it was already fixed by KAFKA-6299.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants