KAFKA-16047: Leverage the fenceProducers timeout in the InitProducerId#15078
KAFKA-16047: Leverage the fenceProducers timeout in the InitProducerId#15078akaltsikis wants to merge 1 commit intoapache:trunkfrom
Conversation
d520a1a to
78de6ab
Compare
| // Set transaction timeout to the equivalent as the fenceProducers request since it's only being initialized to fence out older producers with the same transactional ID, | ||
| // and shouldn't be used for any actual record writes. This has been changed to match the fenceProducers request timeout from one as some brokers may be slower than expected | ||
| // and we need a safe timeout that allows the transaction init to finish. | ||
| .setTransactionTimeoutMs(this.options.timeoutMs()); |
There was a problem hiding this comment.
The timeoutMs is nullable, and is always null when using the default FenceProducersOptions.
When it is null we should use the KafkaAdminClient#defaultApiTimeoutMs, similar to deleteRecords or calcDeadlineMs.
78de6ab to
8d1a1dd
Compare
gharris1727
left a comment
There was a problem hiding this comment.
I ran the ExactlyOnceSourceIntegrationTest, and verified that the execution time did not increase (as desired), but the 90-second default REST Request timeout was used to produce records to the __transaction_state topic.
It appears that even when using a longer transaction timeout, subsequent InitProducerId requests still succeed immediately. Essentially the "fenceProducers" producer ID is being "fenced out" immediately by the initialization of the tasks' producers.
👍
| FenceProducersHandler.newFuture(transactionalIds); | ||
| FenceProducersHandler handler = new FenceProducersHandler(logContext); | ||
| if (options.timeoutMs() == null) { | ||
| options.timeoutMs(defaultApiTimeoutMs); |
There was a problem hiding this comment.
I think it's best if we don't mutate the options if the user passed it in. If someone were to log the options somewhere else, they would see our injected default instead of the fact that no value was provided.
There was a problem hiding this comment.
Thanks for the good comment.
The main reason i mutated the options was to pass the same timeout value in both the FenceProducersHandler and in the invokeDriver.
I just noticed that this is already true as the calcDeadlineMs that is used by the invokeDriver is already returning the defaultApiTimeoutMs in the case of null timeout.
| try (AdminClientUnitTestEnv env = mockClientEnv()) { | ||
| String transactionalId = "copyCat"; | ||
| Node transactionCoordinator = env.cluster().nodes().iterator().next(); | ||
| final FenceProducersOptions options = new FenceProducersOptions().timeoutMs(10000); |
There was a problem hiding this comment.
This doesn't test the null-default case which is going to be more common, and doesn't seem necessary for the correctness of the test either.
I think instead, we should change the request -> request instanceof InitProducerIdRequest to assert that the increased timeout is used, instead of the 1ms timeout. And maybe refactor/parameterize the test to try a default-options and options with an explicit timeout, if we want to test both branches.
|
This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. |
|
This PR seems stale, please see this alternative #16151 |
|
Closing in favor of #16151. Thanks @akaltsikis, I hope you have more time to work on Kafka in the future :) |
KAFKA-16047: Leverage the fenceProducers timeout in the InitProducerId
This is expected to respect the timeout that fenceProducers have, in the InitProducerId request.
Committer Checklist (excluded from commit message)
cc. @gharris1727