KAFKA-12620 Allocate Producer IDs in KRaft controller#10752
KAFKA-12620 Allocate Producer IDs in KRaft controller#10752cmccabe merged 5 commits intoapache:trunkfrom
Conversation
|
Thanks for this, @mumrah . Very clean PR overall. Can you add a test that the RPC fails if we don't have cluster authorization, in Also, QuorumControllerTest#testSnapshotSaveAndLoad needs to be updated... it's failing now in the PR. |
| AllocateProducerIdsRequestData request) { | ||
| return appendWriteEvent("allocateProducerIds", | ||
| () -> producerIdControlManager.generateNextProducerId(request.brokerId(), request.brokerEpoch())) | ||
| .thenApply(resultOrError -> { |
There was a problem hiding this comment.
I can see the reasoning behind doing it this way, but for all the other RPCs we've just been letting the future complete as an error, and making the caller handle it. One issue with changing the pattern, I suppose, is that not all the controller functions return an RPC data structure that allows setting an error. So let's keep the current pattern here for now, where caller has to handle the future completing exceptionally.
There was a problem hiding this comment.
Sounds fine. I'll change the return type of generateNextProducerId to just be ControllerResult<ProducerIdBlock> and throw ApiExceptions directly instead.
* Adding integration tests for authz * Fixing failing unit test (and adding case for ProducerIdsRecord)
This is part 2 of KIP-730, part 1 was in #10504.
This PR adds support on the KRaft controller for handling AllocateProducerIDs requests and managing the state of the latest producer ID block in the controller and committing this state to the metadata log.