KAFKA-14690: Add topic IDs to the OffsetCommit API version 9#13240
KAFKA-14690: Add topic IDs to the OffsetCommit API version 9#13240Hangleton wants to merge 76 commits intoapache:trunkfrom
Conversation
d09095b to
568cbe5
Compare
|
Hello David (@dajac), still working on this but opening a draft if you wish to start reviewing at your convenience. |
|
@Hangleton Thanks. I will take a look later this week. |
75b997e to
3b634e9
Compare
|
Thanks for the review, David. I am working on adding unit tests for |
00b0e10 to
f4f7dcd
Compare
…cNameOrNull; remove <p></p>; nullable -> if/else; revert RequestQuotaTest
…ents in OffsetCommitRequestTest
…dleOffsetCommitRequestTopicsAndPartitionsValidation the same
d2a813a to
892dd25
Compare
1496b9e to
0b0e649
Compare
…r.OffsetCommitRequestTest.
|
Hello David (@dajac), I was discussing this with Christo today as part of his work on the OffsetFetch API. Would you like this PR on OffsetCommit to be split to make the review easier and reduce risks? |
| if (topicName == null) { | ||
| // Could only happen if the broker replied with an ID which was not in the request and | ||
| // unknown by this client. This would be a bug. | ||
| log.warn("Ignoring invalid topic ID found in OffsetCommit response: " + topic.topicId()); | ||
| continue; | ||
| } |
There was a problem hiding this comment.
As a second thought, I wonder if we should complete the future with an exception here. Being defensive would help us to catch bugs early one. What do you think?
dajac
left a comment
There was a problem hiding this comment.
@Hangleton I am back one this one. I looked at the files in clients package and left a few minor comments.
| * - {@link Errors#INVALID_COMMIT_OFFSET_SIZE} | ||
| * - {@link Errors#TOPIC_AUTHORIZATION_FAILED} | ||
| * - {@link Errors#GROUP_AUTHORIZATION_FAILED} | ||
| * - {@link Errors#STALE_MEMBER_EPOCH} |
There was a problem hiding this comment.
Should we remove this one for now as it is not implemented yet?
| // Version 9 adds TopicId field and can return STALE_MEMBER_EPOCH, UNKNOWN_MEMBER_ID | ||
| // and UNKNOWN_TOPIC_ID errors (KIP-848). |
There was a problem hiding this comment.
Should we remove STALE_MEMBER_EPOCH and UNKNOWN_MEMBER_ID for now?
| assertFalse(coordinator.commitOffsetsSync(offsets, time.timer(timeoutMs))); | ||
| } else { | ||
| AtomicBoolean callbackInvoked = new AtomicBoolean(); | ||
| coordinator.commitOffsetsAsync(offsets, (inputOffsets, exception) -> { |
There was a problem hiding this comment.
My understanding is that we don't retry when commitOffsetsAsync is used. Is it correct? If it is, it may be better to split the test in two. It is really misleading otherwise.
| } | ||
| } | ||
|
|
||
| private Map<TopicPartition, OffsetAndMetadata> testRetryCommitWithUnknownTopicIdSetup() { |
There was a problem hiding this comment.
nit: It may be better to name this one prepare.....
| } | ||
| } | ||
|
|
||
| private Map<TopicPartition, OffsetAndMetadata> testRetryCommitWithUnknownTopicIdSetup() { |
There was a problem hiding this comment.
nit: It may be better to name this one prepare.....
| Map<TopicIdPartition, Long> byTopicNameOffsets = new HashMap<>(); | ||
| byTopicNameOffsets.put(ti1p, 100L); | ||
| byTopicNameOffsets.put(ti2p, 200L); | ||
| byTopicNameOffsets.put(unknownTopicIdPartition, 300L); |
| // The following offset commit response defines a topic incorrectly. The coordinator ignores the topic, | ||
| // and the group authorization failure is therefore not propagated. | ||
| client.prepareResponse(offsetCommitResponse(topicName, Uuid.ZERO_UUID, Errors.GROUP_AUTHORIZATION_FAILED)); | ||
| assertTrue(coordinator.commitOffsetsSync(offsets, time.timer(Long.MAX_VALUE))); |
There was a problem hiding this comment.
I think that the method should return false if any mismatched topic id. If I commit foo-topic-id and bar-topic-id, the method should not succeed if we don't get a response for any of them, right?
| // The following offset commit responses defines a topic incorrectly. The coordinator ignores the topic, | ||
| // and the group authorization failure is therefore not propagated. | ||
| asserter.accept( | ||
| offsetCommitResponse(topicName, Uuid.ZERO_UUID, Errors.GROUP_AUTHORIZATION_FAILED), | ||
| null); |
There was a problem hiding this comment.
This case is not correct as well in my opinion. The caller should get an exception in this case.
| return topic; | ||
| } | ||
|
|
||
| public static final class NameAndId { |
There was a problem hiding this comment.
It is a bit weird to have this class defined here but I cannot think of a better place for now. Thoughts?
| ).build(version); | ||
| ))) | ||
| ), | ||
| true |
There was a problem hiding this comment.
nit: this seems to be misaligned.
|
Closing this PR for now as the topic id work will be done later. We can re-open it when we resume the work. |
KAFKA-14690: OffsetCommit API Version 9
This change introduces topic ids in
OffsetCommitrequests and responses. The approach chosen in the PR is to support references to topics by either their ID or name (but not both at the same time).Topic IDs are not surfaced to the Kafka consumer APIs.
On the consumer-side, topic IDs and names are resolved using the client-side cached metadata. On brokers, the metadata cache is used. If a topic ID cannot be resolved on the broker, the corresponding topic will be assigned an
UNKNOWN_TOPIC_IDin theOffsetCommitresponse.This PR is written as part of KIP-848: The Next Generation of the Consumer Rebalance Protocol.
Out of scope
CommitRequestManagerwill be added in KAFKA-14777.Request path on the consumer
Request path on the broker
Requests version >= 9 use topic ids exclusively. Brokers resolve the topic name in the request handler based on cached metadata. The request is then propagated to the group coordinator which constructs the response with topic names as of time of this PR. The resolution of topic name back to topic ID is performed when the response from the group coordinator is merged with the partial response for topic with error (if any).
Committer Checklist (excluded from commit message)