KAFKA-6446: KafkaProducer should use timed version of await to avoid endless waiting#4563
KAFKA-6446: KafkaProducer should use timed version of await to avoid endless waiting#4563hachikuji merged 9 commits intoapache:trunkfrom
await to avoid endless waiting#4563Conversation
…trap server is down https://issues.apache.org/jira/browse/KAFKA-6446 Replaced await() with timed version to avoid endless waiting and refined the code to have Sender thread able to exit from infinitely connecting the `bad` broker.
|
@apurvam Please kindly review. Thanks. |
| sender.wakeup(); | ||
| result.await(); | ||
| try { | ||
| if (!result.await(requestTimeoutMs, TimeUnit.MILLISECONDS)) { |
There was a problem hiding this comment.
I think we should also transition to a fatal error state if we can't init successfully. This will ensure that the only other operation you can do is close. We should also add a test case to ensure that operations other than close are not allowed if initTransactions failed.
There was a problem hiding this comment.
@apurvam Do you mean we should throw a fatal error instead of a TimeoutException?
There was a problem hiding this comment.
We should probably use the producer's max block time instead of the request timeout.
|
@apurvam Thanks for the comments. Please review again. |
|
@becketqin Please take some time to review this patch. Thanks. |
|
@hachikuji Could you help review this patch? Thanks. |
hachikuji
left a comment
There was a problem hiding this comment.
Thanks for the patch. Left a few comments.
| sender.wakeup(); | ||
| result.await(); | ||
| try { | ||
| if (!result.await(requestTimeoutMs, TimeUnit.MILLISECONDS)) { |
There was a problem hiding this comment.
We should probably use the producer's max block time instead of the request timeout.
| if (!result.await(requestTimeoutMs, TimeUnit.MILLISECONDS)) { | ||
| transactionManager.transitionToFatalError( | ||
| new TimeoutException("Timeout expired while initializing the transaction in " + requestTimeoutMs + "ms.")); | ||
| throw new FatalExitError(); |
There was a problem hiding this comment.
I don't think this this is what we want. We're not using an Error anywhere else in the producer. I'd suggest we just throw TimeoutException, but it is a RetriableException, which would be misleading if we do not allow retrying. We could either introduce a FatalTimeoutException, or we could try to make this API safe to retry. For example, to implement the latter, we could cache the result object so that on retry, we continue waiting for it.
There was a problem hiding this comment.
IMO, the retry here should be implemented by the users instead of the producer itself, right? If this is the case, can we simply call initTransactions again after we catch the thrown TimeoutException. In doing so can we avoid to cache the result object. Does it make sense?
There was a problem hiding this comment.
Yes, I am suggesting that we allow the user to retry after a timeout. The simplest way to do so is to cache the result object so that we do not send another InitProducerId request. Instead, we should just continue waiting on the one that we already sent.
There was a problem hiding this comment.
Please confirm: if the retry happens outside initTransactions, in order to return the cached result, we'll change the signature of this method which might need a formal-approving process. However, if we do the retry from within initTransactions, we have to figure out the total retry count ahead of time which still needs a formal discussion. Am I correct?
There was a problem hiding this comment.
Let me clarify what I meant. In TransactionManager.initializeTransactions, we return a TransactionalRequestResult, which we wait on from initTransactions(). What I am suggesting is that we could cache the instance of TransactionalRequestResult inside TransactionManager; if initTransactions() times out and is invoked again, we can just continue waiting on the same result object. So it does not change the API.
| try { | ||
| if (!result.await(requestTimeoutMs, TimeUnit.MILLISECONDS)) { | ||
| transactionManager.transitionToFatalError( | ||
| new TimeoutException("Timeout expired while initializing the transaction in " + requestTimeoutMs + "ms.")); |
There was a problem hiding this comment.
This should probably say "Timeout expired while initializing transactional state ..."
| client.send(clientRequest, now); | ||
| return true; | ||
| } | ||
| break; // break the loop if we failed to find a specific node |
There was a problem hiding this comment.
I am not sure we always want to do break if we cannot find a node to send the request to. Other than the bootstrapping case, we typically just want to refresh metadata and try again. I think I'd suggest we leave this logic as is, but add a condition to the loop to check for producer shutdown.
… FatorExitError and changed to a conditional loop
|
@hachikuji Please review again. Thanks. |
hachikuji
left a comment
There was a problem hiding this comment.
Thanks, one more comment.
| InitProducerIdHandler handler = new InitProducerIdHandler(builder); | ||
| enqueueRequest(handler); | ||
| return handler.result; | ||
| if (transactionalRequestResult == null || transactionalRequestResult.isCompleted()) { |
There was a problem hiding this comment.
In the current code, calling initializeTransactions more than once causes an illegal state. I think we should preserve that semantic in spite of retries after timeouts. In other words, the only situation where you're allowed to retry a call to initializeTransactions is after a timeout. Once it returns successfully for the user, we go back to the current behavior and raise an illegal state.
We're almost there with this patch, but the user could see the illegal state before a successful call because of the isCompleted check here. To implement the semantics we want, I think we may have to do the following:
- Move the caching of the initTransactions result into
KafkaProducer. - On the first invocation, we cache the result.
- If there is a timeout, we continue waiting on the result.
- Once the result completes successfully, we should return and set the result to null.
Once the cached result is cleared, if the user tries to initializeTransactions again, they will get the current illegal state error, which is what we want.
There was a problem hiding this comment.
Moving the cache result to KafkaProducer has to change the signature of initTransactions which might need a KIP to have it discussed. Is that true?
There was a problem hiding this comment.
What I mean is that we would create a private field for the cached result in KafkaProducer, so the signature does not change. The reason I am suggesting we move it there is that that is where we are doing the wait, which means we know when the operation has been successfully completed from the user's perspective.
hachikuji
left a comment
There was a problem hiding this comment.
Thanks for the updates. Looking good, but had a couple more comments.
| private final ProducerInterceptors<K, V> interceptors; | ||
| private final ApiVersions apiVersions; | ||
| private final TransactionManager transactionManager; | ||
| private TransactionalRequestResult transactionalRequestResult; |
There was a problem hiding this comment.
nit: we only use this for initializeTransactions, so maybe the name could be more specific? Say initTransactionsResult?
| * @throws org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured | ||
| * transactional.id is not authorized. See the exception for more details | ||
| * @throws KafkaException if the producer has encountered a previous fatal error or for any other unexpected error | ||
| * @throws TimeoutException if the time taken for initialize the transaction has surpassed <code>max.block.ms</code>. |
There was a problem hiding this comment.
Can you add a comment to the javadoc mentioning that this method may be retried if a TimeoutException or an InterruptException is raised?
| private def createTransactionalProducerToConnectNonExistentBrokers(): KafkaProducer[Array[Byte], Array[Byte]] = { | ||
| val props = new Properties() | ||
| props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test-transaction-id") | ||
| val producer = TestUtils.createNewProducer(brokerList = "192.168.1.1:9092", maxBlockMs = 1000, |
There was a problem hiding this comment.
Is there a way we can test this without depending on an IP directly like this? I don't think we can assume that the builds will always be sandboxed. I actually think KafkaProducerTest might be a better place since it lets us mock the network layer.
|
retest it please |
hachikuji
left a comment
There was a problem hiding this comment.
Thanks for the patch, LGTM. I made a few minor tweaks to the new test cases so that they depended on a mocked network (and ran faster). There are a few additional test paths to hit, but they are a little harder. I will try to address them in an upcoming PR.
https://issues.apache.org/jira/browse/KAFKA-6446
Replaced await() with timed version to avoid endless waiting and refined the code to have Sender thread able to exit from infinitely connecting the
badbroker.More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.
Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.
Committer Checklist (excluded from commit message)