Skip to content

KAFKA-16713: Define initial set of RPCs for KIP-932#16022

Merged
omkreddy merged 4 commits intoapache:trunkfrom
AndrewJSchofield:KAFKA-16713
Jun 3, 2024
Merged

KAFKA-16713: Define initial set of RPCs for KIP-932#16022
omkreddy merged 4 commits intoapache:trunkfrom
AndrewJSchofield:KAFKA-16713

Conversation

@AndrewJSchofield
Copy link
Copy Markdown
Member

This PR defines the initial set of RPCs for KIP-932. The RPCs for the admin client and state management are not in this PR.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@dajac
Copy link
Copy Markdown
Member

dajac commented May 21, 2024

Should we wait until we cut the 3.8 branch before merging those? If we don't wait, I suggest to mark all the new versions as unstable until they are implemented.

@AndrewJSchofield
Copy link
Copy Markdown
Member Author

Yes, we will wait until we cut the 3.8 branch before merging.

@omkreddy omkreddy self-assigned this May 22, 2024
Copy link
Copy Markdown
Contributor

@apoorvmittal10 apoorvmittal10 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. LGTM, minor comments.

*/
package org.apache.kafka.common.errors;

public class FencedStateEpochException extends ApiException {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we please have the comments for the exception class as like InvalidRecordStateException class defined in the PR. Sorry for missing this earlier but seems this error class is not defined in the KIP though others are.

);
}

public static List<ShareGroupDescribeResponseData.DescribedGroup> getErrorDescribedGroupList(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally I have received feedback of not including get in such getters so may be rename method to errorDescribedGroupList.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class matches the similar ConsumerGroupDescribeRequest so I think it's best to leave it in the name of consistency.

// Version 5 adds support for new error code TRANSACTION_ABORTABLE (KIP-890).
"validVersions": "0-5",
//
// Version 6 adds support for share groups (KIP-932).
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might want to update KIP with version 6 as KIP says version 5 is added for KIP-932.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I will do a KIP update once all of the IDs are confirmed by the merge. Essentially, KIP-890 overtook KIP-932.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@omkreddy
Copy link
Copy Markdown
Contributor

looks compilation failure after KAFKA-7632: Support Compression Levels (KIP-390) (#15516). Can we rebase the PR?

[2024-05-22T13:44:38.423Z] > Task :clients:compileTestJava

[2024-05-22T13:44:38.423Z] /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-16022/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java:1417: error: no suitable method found for withRecords(CompressionType,SimpleRecord)

[2024-05-22T13:44:38.423Z]         MemoryRecords records = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("blah".getBytes()));

[2024-05-22T13:44:38.423Z]                                              ^

[2024-05-22T13:44:38.423Z]     method MemoryRecords.withRecords(Compression,SimpleRecord...) is not applicable

[2024-05-22T13:44:38.423Z]       (argument mismatch; CompressionType cannot be converted to Compression)

[2024-05-22T13:44:38.423Z]     method MemoryRecords.withRecords(Compression,int,SimpleRecord...) is not applicable

[2024-05-22T13:44:38.423Z]       (argument mismatch; CompressionType cannot be converted to Compression)

[2024-05-22T13:44:38.423Z]     method MemoryRecords.withRecords(byte,Compression,SimpleRecord...) is not applicable

[2024-05-22T13:44:38.423Z]       (argument mismatch; CompressionType cannot be converted to byte)

[2024-05-22T13:44:38.423Z]     method MemoryRecords.withRecords(long,Compression,SimpleRecord...) is not applicable

[2024-05-22T13:44:38.423Z]       (argument mismatch; CompressionType cannot be converted to long)

[2024-05-22T13:44:38.423Z]     method MemoryRecords.withRecords(byte,long,Compression,SimpleRecord...) is not applicable

[2024-05-22T13:44:38.423Z]       (argument mismatch; CompressionType cannot be converted to byte)

[2024-05-22T13:44:38.423Z]     method MemoryRecords.withRecords(long,Compression,Integer,SimpleRecord...) is not applicable

[2024-05-22T13:44:38.423Z]       (argument mismatch; CompressionType cannot be converted to long)

[2024-05-22T13:44:38.423Z]     method MemoryRecords.withRecords(byte,long,Compression,TimestampType,SimpleRecord...) is not applicable

[2024-05-22T13:44:38.423Z]       (argument mismatch; CompressionType cannot be converted to byte)

[2024-05-22T13:44:38.423Z]     method MemoryRecords.withRecords(byte,long,Compression,TimestampType,long,short,int,int,boolean,SimpleRecord...) is not applicable

[2024-05-22T13:44:38.423Z]       (argument mismatch; CompressionType cannot be converted to byte)

@AndrewJSchofield
Copy link
Copy Markdown
Member Author

There are 2 test failures introduced by this PR. I'll get them fixed.

Copy link
Copy Markdown
Contributor

@omkreddy omkreddy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AndrewJSchofield Thanks for the PR. LGTM

@omkreddy
Copy link
Copy Markdown
Contributor

omkreddy commented Jun 3, 2024

@omkreddy omkreddy merged commit 8f82f14 into apache:trunk Jun 3, 2024
@AndrewJSchofield AndrewJSchofield deleted the KAFKA-16713 branch June 3, 2024 07:41
wernerdv pushed a commit to wernerdv/kafka that referenced this pull request Jun 3, 2024
This PR defines the initial set of RPCs for KIP-932. The RPCs for the admin client and state management are not in this PR.

Reviewers: Apoorv Mittal <amittal@confluent.io>, Manikumar Reddy <manikumar.reddy@gmail.com>
brenden20 added a commit to brenden20/kafka that referenced this pull request Jun 3, 2024
brenden20 added a commit to brenden20/kafka that referenced this pull request Jun 4, 2024
TaiJuWu pushed a commit to TaiJuWu/kafka that referenced this pull request Jun 8, 2024
This PR defines the initial set of RPCs for KIP-932. The RPCs for the admin client and state management are not in this PR.

Reviewers: Apoorv Mittal <amittal@confluent.io>, Manikumar Reddy <manikumar.reddy@gmail.com>
gongxuanzhang pushed a commit to gongxuanzhang/kafka that referenced this pull request Jun 12, 2024
This PR defines the initial set of RPCs for KIP-932. The RPCs for the admin client and state management are not in this PR.

Reviewers: Apoorv Mittal <amittal@confluent.io>, Manikumar Reddy <manikumar.reddy@gmail.com>
return counts;
}

public LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> responseData(Map<Uuid, String> topicNames) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just curious. The cached responseData is created according to input topicNames. It will return same object even though we use different input, and hence I'm not sure how we will use it? If that ShareFetchResponse will share everywhere, it will be error-prone. If ShareFetchResponse is used as a local variable, the cache seems to be useless.

Please correct me if I misunderstand anything.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, method will reply with previous parsed response irrespective of input. However I see similar issue in FetchResponse.java. Though for ShareFetch this method will be used only in tests for now but either we should fix or atleast write the expectations from the method in javadoc.

@adixitconfluent @AndrewJSchofield wdyt? I can write the javadoc with expectations in next PR if that we agrees with.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is true, but it is derived from FetchResponse. @apoorvmittal10's suggestion works for me.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see similar issue in FetchResponse.java.

yes, I have filed a jira https://issues.apache.org/jira/browse/KAFKA-16684 for it. Also, there is already a PR #15966

@apoorvmittal10's suggestion works for me.

sounds good to me too

Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AndrewJSchofield : Thanks for the PR and sorry for the late review. Left a comment below.

"about": "First offset of batch of records to acknowledge."},
{ "name": "LastOffset", "type": "int64", "versions": "0+",
"about": "Last offset (inclusive) of batch of records to acknowledge."},
{ "name": "AcknowledgeTypes", "type": "[]int8", "versions": "0+",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AndrewJSchofield : I am wondering why AcknowledgeTypes is an array instead of just an int8. This forces every record to be acked individually and I thought we want to optimize the case that multiple records are of the same ack type.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@junrao As it says in the KIP, AcknowledgeTypes can consist of a single value which applies to all entries in the batch, or a separate value for each offset in the batch. By making it an array, we have the option of optimising with a single entry in the common case where multiple records are of the same ack type, but we can also accommodate the case where they differ simply by adding more entries to the array.

@mumrah mumrah added KIP-932 Queues for Kafka and removed queues-for-kafka labels Sep 16, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

KIP-932 Queues for Kafka

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants