Search before reporting
Read release policy
User environment
Issue Description
There is a reentrancy bug in the Pulsar producer send path where pendingMessages.clear() can be executed after a retry message has already been added to pendingMessages. This results in the retry send’s CompletableFuture never being completed.
This can occur when a retry sendAsync is triggered synchronously from within a handleSync callback of a failed send, while holding the producer mutex.
This happens in the failPendingMessages method which usually runs on the timer thread.
As the pendingMessages.clear() is after the completeExceptionally, the retry logic as the code below will add the retryMessage to pendingMessages first and then the clear is called.
CompletableFuture<MessageId> firstSend = producer.sendAsync(message);
CompletableFuture<MessageId> retrySend =
firstSend.handleAsync((msgId, ex) -> {
assertNotNull(ex, "First send must timeout");
assertTrue(ex instanceof PulsarClientException.TimeoutException);
return producer.sendAsync(retryMessage);
}).thenCompose(f -> f);
Error messages
Reproducing the issue
Set a low timeout value and use synchronous retries as given in the above example.
Additional information
No response
Are you willing to submit a PR?
Search before reporting
Read release policy
User environment
Issue Description
There is a reentrancy bug in the Pulsar producer send path where
pendingMessages.clear()can be executed after a retry message has already been added topendingMessages. This results in the retry send’s CompletableFuture never being completed.This can occur when a retry
sendAsyncis triggered synchronously from within a handleSync callback of a failed send, while holding the producer mutex.This happens in the failPendingMessages method which usually runs on the timer thread.
As the pendingMessages.clear() is after the completeExceptionally, the retry logic as the code below will add the retryMessage to pendingMessages first and then the clear is called.
Error messages
Reproducing the issue
Set a low timeout value and use synchronous retries as given in the above example.
Additional information
No response
Are you willing to submit a PR?