KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests#13231
KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests#13231jolshan merged 18 commits intoapache:trunkfrom
Conversation
jeffkbkim
left a comment
There was a problem hiding this comment.
Thanks for the PR! left some comments
|
I've added the changes to the API spec I've added builders to the request and tried to simplify some of the methods. I will update the KIP to reflect some of these changes (especially with respect to the API spec) I still need to address the unstable API change, but that will require a pull from master. |
|
Took a quick look at the unstable api change. Looks like some integration tests built specifically for v4 fail with I will need to look into this. |
@jolshan I suppose that you have to enable unstable apis in your new integration tests. |
guozhangwang
left a comment
There was a problem hiding this comment.
Made a new pass according to the newly updated KIP, do not have further comments.
|
@dajac @hachikuji if you do not have further comments, we can proceed and merge it then? |
| public void handleResponse(AbstractResponse response) { | ||
| AddPartitionsToTxnResponse addPartitionsToTxnResponse = (AddPartitionsToTxnResponse) response; | ||
| Map<TopicPartition, Errors> errors = addPartitionsToTxnResponse.errors(); | ||
| Map<TopicPartition, Errors> errors = addPartitionsToTxnResponse.errors().get(AddPartitionsToTxnResponse.V3_AND_BELOW_TXN_ID); |
There was a problem hiding this comment.
nit: I suppose that errors should never be null here. I wonder if we should still check it. What do you think?
There was a problem hiding this comment.
What should we do if the check fails? Just have a better error message thrown?
There was a problem hiding this comment.
Yeah, the check is probably not necessary. By the way, I find the idea of having V3_AND_BELOW_TXN_ID for old version a bit confusing. I was wondering if using addPartitionsToTxnResponse.data().resultsByTopicV3AndBelow() would be a better alternative here. We only iterate over the Map so the Map is not strictly required here. Have you considered something like this?
There was a problem hiding this comment.
I was told not to have v3 and below specific methods from Jason because the v3 case should generalize to a single version of the v4 case and that should make it easy to use methods for both.
However, if we really think this is an issue. I guess we can change the approach again. I'm just not sure the experience of errors only applying to v4+. Any ideas there besides changing the method name to express it should only be used in v4+?
There was a problem hiding this comment.
Understood. Let's keep it as it is then.
I agree that v3 case should generalized to a single item of the v4 case. It is just unfortunate that we don't have the transaction id in v3 response so we have to use an empty string for it. I suppose that it is the way it is.
There was a problem hiding this comment.
Yeah. It really is unfortunate. 😞
… for top level errors, minor changes
dajac
left a comment
There was a problem hiding this comment.
LGTM, thanks for the patch. I left two suggestions for follow-ups. We can do those in separate PRs. This one is large enough.
| ApiKeys.INIT_PRODUCER_ID -> ((resp: InitProducerIdResponse) => resp.error), | ||
| ApiKeys.WRITE_TXN_MARKERS -> ((resp: WriteTxnMarkersResponse) => resp.errorsByProducerId.get(producerId).get(tp)), | ||
| ApiKeys.ADD_PARTITIONS_TO_TXN -> ((resp: AddPartitionsToTxnResponse) => resp.errors.get(tp)), | ||
| ApiKeys.ADD_PARTITIONS_TO_TXN -> ((resp: AddPartitionsToTxnResponse) => resp.errors.get(AddPartitionsToTxnResponse.V3_AND_BELOW_TXN_ID).get(tp)), |
There was a problem hiding this comment.
As a follow-up: We should cover the new version here as well.
There was a problem hiding this comment.
The new version doesn't really use authorizer, so I wasn't sure if it was needed.
There was a problem hiding this comment.
I guess it uses authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
| } | ||
|
|
||
| @Test | ||
| def testBatchedAddPartitionsToTxnRequest(): Unit = { |
There was a problem hiding this comment.
As a follow-up: It seems that the test coverage is pretty low for this API here. It would be great if we could extend it. e.g. authorization failures, validation failures, etc.
|
for follow ups: https://issues.apache.org/jira/browse/KAFKA-14790 |
…mnative#942) Main changes: - Adapt to the new `AddPartitionsToTxnRequest` from apache/kafka#13231 (KIP-890) - Support the new `DescribeTopicPartitions` request from apache/kafka#14612 (KIP-966), which is required by some admin APIs Other changes: - apache/kafka#13760 will retry when `deleteRecords` returns a retriable error, change the error code to `INVALID_REQUEST`
Part 1 of KIP-890
I've updated the API spec and related classes.
Clients should only be able to send up to version 3 requests and that is enforced by using a client builder.
Requests > 4 only require cluster permissions as they are initiated from other brokers.
I've added tests for the batched requests and for the verifyOnly mode.
Also -- minor change to the KafkaApis method to properly match the request name.
Committer Checklist (excluded from commit message)