Skip to content

KIP-396: Add AlterConsumerGroup/List Offsets to AdminClient#7296

Merged
hachikuji merged 12 commits intoapache:trunkfrom
mimaison:kip-396
Oct 20, 2019
Merged

KIP-396: Add AlterConsumerGroup/List Offsets to AdminClient#7296
hachikuji merged 12 commits intoapache:trunkfrom
mimaison:kip-396

Conversation

@mimaison
Copy link
Copy Markdown
Member

@mimaison mimaison commented Sep 4, 2019

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@mimaison
Copy link
Copy Markdown
Member Author

@guozhangwang @vahidhashemian @bbejeck @harshach @cmccabe @hachikuji As you all voted on this KIP, can a few of you review the PR? Thanks

Copy link
Copy Markdown
Contributor

@ryannedolan ryannedolan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking forward to this!

Comment thread core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala Outdated
Comment thread clients/src/main/java/org/apache/kafka/clients/admin/Admin.java Outdated
Copy link
Copy Markdown
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did a quick look at the PR and it looks good, would try to squeeze out some time with a thorough review.

@mimaison
Copy link
Copy Markdown
Member Author

@guozhangwang @vahidhashemian @bbejeck @harshach @cmccabe @hachikuji I'd love to get this in 2.4, can you take a look? It's a relatively straight forward KIP/PR.

@HeartSaVioR
Copy link
Copy Markdown

I couldn't help reviewing but this would be really helpful to the Kafka integration on Spark. Looking forward to the feature!

Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, left a few initial comments.

Comment thread clients/src/main/java/org/apache/kafka/clients/admin/Admin.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/clients/admin/ListOffsetsOptions.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/clients/admin/ListOffsetsResult.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/clients/admin/ListOffsetsResult.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/common/IsolationLevel.java Outdated
@mimaison mimaison force-pushed the kip-396 branch 2 times, most recently from e0c6be4 to 412608e Compare October 4, 2019 02:24
@mimaison
Copy link
Copy Markdown
Member Author

mimaison commented Oct 4, 2019

Thanks @hachikuji and @guozhangwang for the feedback!

I've pushed an update:

  • addressing all the minor issues
  • hiding the ListOffsetRequest magic values
  • adding some retry logic for fetching metadata. I've re-use the pattern from FindCoordinator.

I'm flying back to the UK tomorrow evening but I should be able to make more changes tomorrow if needed. If we're not happy with the metadata retry logic, maybe a temporary solution would be to remove it and keep using the Consumer in the consumer group tool for now. Then next week I can revisit it. What do you think?

Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates, left some more comments.

Comment thread clients/src/main/java/org/apache/kafka/clients/admin/AlterOffsetsResult.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java Outdated
@mimaison
Copy link
Copy Markdown
Member Author

mimaison commented Oct 4, 2019

@hachikuji Thanks again for the review. I've pushed an update. I'll start adding coverage in KafkaAdminClientTest now

Comment thread clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java Outdated
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not something we have to do here, but one way we could improve this in the future is by taking into account leader epoch information from individual partitions. We can ensure that epochs increase monotonically in order to prevent using stale information during retry.

Another thing we could do is reduce the topics we are fetching metadata for as the ListOffsets requests complete. Ideally we'd only be refetching metadata for topics with metadata errors.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes these improvements would be nice. At the moment, I've kept it very simple and just make it retry the full metadata request every time.

Comment thread clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java Outdated
Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mimaison Thanks, found a couple more problems that we need to fix before we can merge. There will be a few follow-ups as well that we can save for next week.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused what's going on with this API. We first wait on the aggregate future and then we wrap it in another future. That seems wrong, right? The call to all() shouldn't itself block.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that is wrong.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need the nested futures here. The new api just works with a single group, so seems like the type should just be KafkaFuture<Map<TopicPartition, Void>>. Also, note that we don't want to return KafkaFutureImpl directly.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. For consistency, it's actually best to have KafkaFuture<Map<TopicPartition, Errors>>. So it's the same as deleteConsumerGroupOffsets().

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Huhmm... Actually I think that's a mistake in deleteConsumerGroupOffsets. We don't want to expose Errors directly. I will submit a separate PR

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's just use the same two APIs from deleteConsumerGroupOffsets:

    public KafkaFuture<Void> partitionResult(final TopicPartition partition);
    public KafkaFuture<Void> all();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good catch, thanks @hachikuji , we can address 8992 within the 2.4 deadline.

@mimaison
Copy link
Copy Markdown
Member Author

mimaison commented Oct 7, 2019

Gathering the TODOs we identified:

@hachikuji
Copy link
Copy Markdown
Contributor

@mimaison Note the compilation failure:

06:06:20 /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.12/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala:394: not found: type AlterOffsetsOptions
06:06:20                     withTimeoutMs(new AlterOffsetsOptions)

@mimaison
Copy link
Copy Markdown
Member Author

mimaison commented Oct 7, 2019

Thanks @hachikuji, fixed

Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I think we're almost there, but still a couple problems to fix.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm.. This is a little different from what we have in DeleteConsumerGroupOffsetsResult. I think it makes sense to check all the partition level errors. cc @dajac

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a fair point but I am not sure what the best one is. The rational behind not looking at individual topic/partitions was that it allows to use all() to wait for the completion of the request and then check the individual results. In this case all() fails only if the whole group has failed.

To be more concrete, it allows to do the following:

DeleteConsumerGroupOffsetsResult result = ...;

try {
  // wait for the whole group, only raise when a group level or
  // transport level exception affection the whole request occurs
  result.all().get()

  // inspect individual topic/partition
  try {
    result.partitionResult(...).get()
  } catch (Exception e) {
    // handle partition exception
  }
} catch (Exception e) {
  // handle group level exception
}

I think that this facilitates the error handling. What do you think?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's an interesting point. I think the usual semantics of all is to only succeed if all individual operations have succeeded. It's sort of designed for lazy error handling I guess. If users care about the individual operations, they can check them individually. Otherwise they have a convenient way to check for any errors. Based on what I've seen, this tends to be the most frequent use. I think also part of the idea is to abstract away from the underlying requests. Some of the admin APIs result in multiple broker requests which makes exposing the full granularity of errors quite cumbersome.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just made a pass on all XXXResult classes and I think the API semantics are a bit inconsistency in general: originally I thought we only need the all function if the result contains futures in the form of Map<..., KafkaFuture<...>> which potentially requires one trip for each nested future, and the all function is used as a lazy way to check that all entries have completed successfully. But some (e.g. RemoveMemberFromGroupResult in form of Map<MemberIdentity, KafkaFuture<Void>>) actually only requires one request too, so all futures would actually be always completed at the same time. For those cases we do not need an all function either.

But it seems like for results that only contain a KafkaFuture<Object> we also have a dummy all function, and many of their all semantics are different too.

Honestly I think not all results needs an all function, but it seems we are already a bit messy here..

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, unfortunately the admin APIs have such a big surface area it's hard to maintain consistency. I think the original intent is what I described though.

Comment thread clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java Outdated
@hachikuji
Copy link
Copy Markdown
Contributor

@mimaison I see the recent comments were marked resolved, but I don't see the changes. Are you still working on an update?

@mimaison
Copy link
Copy Markdown
Member Author

mimaison commented Oct 9, 2019 via email

@mimaison
Copy link
Copy Markdown
Member Author

@hachikuji I've pushed an update

Comment thread clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java Outdated
Comment thread clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/clients/admin/ListOffsetsResult.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/clients/admin/ListOffsetsResult.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java Outdated
@mimaison
Copy link
Copy Markdown
Member Author

Thanks @hachikuji for the feedback, I've pushed another update

Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @mimaison . I posted a final round of comments and then we can merge.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The user is trying to access a partition that was not requested. I think we could raise IllegalArgumentException directly to the user.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit subtle, but I think we want to raise the InvalidMetadataException rather than constructing a new Call. The problem is that we lose the retry bookkeeping which means these retries will not respect the backoff. By throwing the exception, we let the retry logic in Call.fail kick in. This would be consistent with the logic in getFindCoordinatorCall.

@mimaison
Copy link
Copy Markdown
Member Author

Thanks @hachikuji, I've updated the PR and rebased on trunk

Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks for the patch!

@hachikuji
Copy link
Copy Markdown
Contributor

retest this pleasee

@guozhangwang
Copy link
Copy Markdown
Contributor

LGTM!

@hachikuji hachikuji merged commit 99a4068 into apache:trunk Oct 20, 2019
@HeartSaVioR
Copy link
Copy Markdown

Amazing! Thanks all for the every efforts on this!

@gaborgsomogyi
Copy link
Copy Markdown

Thanks guys, starting the Spark integration part.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants