Skip to content

KAFKA-17331: Throw UnsupportedVersionException if the data in ListOffsetRequest does NOT fit EarliestLocalSpec and LatestTieredSpec.#16876

Merged
chia7712 merged 13 commits intoapache:trunkfrom
frankvicky:KAFKA-17331
Aug 25, 2024
Merged

Conversation

@frankvicky
Copy link
Copy Markdown
Contributor

JIRA: KAFKA-17331

Add the version check to client side when building ListOffsetRequest for the specific timestamp:

  • the version must be >=8 if timestamp=-4L (EARLIEST_LOCAL_TIMESTAMP)
  • the version must be >=9 if timestamp=-5L (LATEST_TIERED_TIMESTAMP)

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

Copy link
Copy Markdown
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@frankvicky thanks for this patch

Comment thread clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java Outdated
@frankvicky
Copy link
Copy Markdown
Contributor Author

Hi @chia7712
I have addressed the comments, PTAL

Copy link
Copy Markdown
Member

@FrankYang0529 FrankYang0529 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. Leave one comment.

public ListOffsetsRequest build(short version) {
for (ListOffsetsTopic topic : data.topics()) {
for (ListOffsetsPartition partition : topic.partitions()) {
checkVersion(version, topic.name(), partition);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The failed case ListOffsetsRequestTest#testResponseDefaultOffsetAndLeaderEpochForAllVersions can be reproduced on my laptop. Could you take a look? Thank you.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, I will take a look!

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@frankvicky Could you please add a helper to build invalid request in order to run test?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @FrankYang0529 and @chia7712
I have reviewed the code. Since this error is due to a version check in the build method, I will change assertEqual to assertThrows when the test tries to build an invalid request to align with the current behavior.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@frankvicky What we want to protect is the request from clients (admin and consumer). So maybe we should revise fromConsumer[0] rather than build.

WDYT?

[0]

public static Builder forConsumer(boolean requireTimestamp,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The benefit is that we can leverage the API_VERSION check of network layer. The disadvantage is that it requires the API_VERSION, and fortunately that is existent in consumer and admin.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @chia7712
I have reviewed the code. Since forConsumer is a static method, if we aim to perform a version check in this method, we will need to add a new argument. I will try to implement it.

boolean requireMaxTimestamp,
boolean requireTieredStorageTimestamp) {
boolean requireTieredStorageTimestamp,
List<ListOffsetsTopic> topics) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please follow the pattern? You can add a flag to pick up the min version. Current implementation does not consider the version=8 and I feel that is a kind of bug. @FrankYang0529 WDYT?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @chia7712
I have add a requireEarliestLocalTimestamp parameter to forConsumer method, PTAL

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we should update this. Thanks @chia7712 and @frankvicky

Copy link
Copy Markdown
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@frankvicky thanks for this patch

return new Builder(minVersion, ApiKeys.LIST_OFFSETS.latestVersion(), CONSUMER_REPLICA_ID, isolationLevel);

short version = ApiKeys.LIST_OFFSETS.latestVersion();
topics.forEach(topic -> topic.partitions().forEach(partition -> checkVersion(version, topic.name(), partition)));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After setting the min version, the version checkcan handled by NetworkClient [0]. Hence, we don't need to add extra check here.

[0] https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L528

@chia7712
Copy link
Copy Markdown
Member

@frankvicky I feel the refactor is a bit overkill. Maybe we can add following two methods:

        public static Builder forConsumer(boolean requireTimestamp,
                                          IsolationLevel isolationLevel) {
            return forConsumer(requireTimestamp, isolationLevel, false, false, false);
        }

        public static Builder forConsumer(boolean requireTimestamp,
                                          IsolationLevel isolationLevel,
                                          boolean requireMaxTimestamp,
                                          boolean requireEarliestTimestamp,
                                          boolean requireTieredStorageTimestamp) {
            short minVersion = 0;
            if (requireTieredStorageTimestamp)
                minVersion = 9;
            else if (requireEarliestTimestamp)
                minVersion = 8;
            else if (requireMaxTimestamp)
                minVersion = 7;
            else if (isolationLevel == IsolationLevel.READ_COMMITTED)
                minVersion = 2;
            else if (requireTimestamp)
                minVersion = 1;
            return new Builder(minVersion, ApiKeys.LIST_OFFSETS.latestVersion(), CONSUMER_REPLICA_ID, isolationLevel);
        }

Those two methods should cover all use cases for now.

@frankvicky
Copy link
Copy Markdown
Contributor Author

Hi @chia7712
I have refactor the forConsumer method, PTAL

Copy link
Copy Markdown
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@frankvicky thanks for this patch. a couple of comments are left.

ListOffsetsRequest.Builder builder = ListOffsetsRequest.Builder
.forConsumer(requireTimestamps, isolationLevel, false, false)
.setTargetTimes(ListOffsetsRequest.toListOffsetsTopics(targetTimes));
List<ListOffsetsRequestData.ListOffsetsTopic> topics = ListOffsetsRequest.toListOffsetsTopics(targetTimes);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is no strong reason, could we keep fluent pattern?

        ListOffsetsRequest.Builder builder = ListOffsetsRequest.Builder
                .forConsumer(requireTimestamps, isolationLevel)
                .setTargetTimes(ListOffsetsRequest.toListOffsetsTopics(targetTimes));

.setTargetTimes(topics)
.build(version);
ListOffsetsResponse response = (ListOffsetsResponse) request.getErrorResponse(0, Errors.NOT_LEADER_OR_FOLLOWER.exception());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please revert this unrelated change

.forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false, false)
.setTargetTimes(singletonList(topic))
.forConsumer(false, IsolationLevel.READ_UNCOMMITTED)
.setTargetTimes(topics)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

.forConsumer(true, IsolationLevel.READ_UNCOMMITTED, false, false)
.setTargetTimes(singletonList(topic))
.forConsumer(true, IsolationLevel.READ_UNCOMMITTED)
.setTargetTimes(topics)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

.forConsumer(true, IsolationLevel.READ_COMMITTED, false, false)
.setTargetTimes(singletonList(topic))
.forConsumer(true, IsolationLevel.READ_COMMITTED)
.setTargetTimes(topics)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

.setCurrentLeaderEpoch(27)).asJava)).asJava
).
build()
val topics = List(new ListOffsetsTopic()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

val topicPartition = new TopicPartition("foo", 0)
val request = ListOffsetsRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false, false)
.setTargetTimes(buildTargetTimes(topicPartition, ListOffsetsRequest.LATEST_TIMESTAMP, 10).asJava).build(0)
val targetTimes = buildTargetTimes(topicPartition, ListOffsetsRequest.LATEST_TIMESTAMP, 10).asJava
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

.setCurrentLeaderEpoch(15)).asJava)
ListOffsetsRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false, false)
.setTargetTimes(List(topic).asJava)
val targetTimes = List(topic).asJava
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

public String toString() {
return data.toString();
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Copy Markdown
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

short minVersion = 0;
if (requireTieredStorageTimestamp)
minVersion = 9;
else if (requireEarliestTimestamp)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@FrankYang0529 Could you please take a look?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @chia7712 and @frankvicky, What does requireEarliestTimestamp mean? EARLIEST_TIMESTAMP or EARLIEST_LOCAL_TIMESTAMP? If it's EARLIEST_LOCAL_TIMESTAMP, probably we should change the parameter name as requireEarliestLocalTimestamp, so users don't misunderstand it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I will update it

@chia7712
Copy link
Copy Markdown
Member

@FrankYang0529 Could you please take a look?

Copy link
Copy Markdown
Member

@FrankYang0529 FrankYang0529 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks for the fix.

@chia7712 chia7712 merged commit d67c18b into apache:trunk Aug 25, 2024
chia7712 pushed a commit that referenced this pull request Aug 25, 2024
…dSpec (#16876)

Add the version check to client side when building ListOffsetRequest for the specific timestamp:
1) the version must be >=8 if timestamp=-4L (EARLIEST_LOCAL_TIMESTAMP)
2) the version must be >=9 if timestamp=-5L (LATEST_TIERED_TIMESTAMP)

Reviewers: PoAn Yang <payang@apache.org>, Chia-Ping Tsai <chia7712@gmail.com>
LoganZhuZzz pushed a commit to LoganZhuZzz/kafka that referenced this pull request Aug 28, 2024
…dSpec (apache#16876)

Add the version check to client side when building ListOffsetRequest for the specific timestamp:
1) the version must be >=8 if timestamp=-4L (EARLIEST_LOCAL_TIMESTAMP)
2) the version must be >=9 if timestamp=-5L (LATEST_TIERED_TIMESTAMP)

Reviewers: PoAn Yang <payang@apache.org>, Chia-Ping Tsai <chia7712@gmail.com>
bboyleonp666 pushed a commit to bboyleonp666/kafka that referenced this pull request Sep 4, 2024
…dSpec (apache#16876)

Add the version check to client side when building ListOffsetRequest for the specific timestamp:
1) the version must be >=8 if timestamp=-4L (EARLIEST_LOCAL_TIMESTAMP)
2) the version must be >=9 if timestamp=-5L (LATEST_TIERED_TIMESTAMP)

Reviewers: PoAn Yang <payang@apache.org>, Chia-Ping Tsai <chia7712@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants