diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerAppendInfo.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerAppendInfo.java index 26444c07793c7..687d3a5522729 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerAppendInfo.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerAppendInfo.java @@ -119,6 +119,22 @@ private void checkProducerEpoch(short producerEpoch, long offset, short transact boolean invalidEpoch = (transactionVersion >= 2) ? (producerEpoch <= current) : (producerEpoch < current); if (invalidEpoch) { + // TV2 Idempotent Marker Retry Detection (KAFKA-19999): + // When markerEpoch == currentEpoch and no transaction is ongoing, this indicates + // a retry of a marker that was already successfully written. Common scenarios: + // 1. Coordinator recovery: reloading PREPARE_COMMIT/ABORT from transaction log + // 2. Network retry: marker was written but response was lost due to disconnection + // In both cases, the transaction has already ended (currentTxnFirstOffset is empty). + // We suppress the InvalidProducerEpochException and allow the duplicate marker to + // be written to the log. + if (transactionVersion >= 2 && + producerEpoch == current && + updatedEntry.currentTxnFirstOffset().isEmpty()) { + log.info("Idempotent transaction marker retry detected for producer {} epoch {}. " + + "Transaction already completed, allowing duplicate marker write.", + producerId, producerEpoch); + return; + } String comparison = (transactionVersion >= 2) ? "<=" : "<"; String message = "Epoch of producer " + producerId + " at offset " + offset + " in " + topicPartition + " is " + producerEpoch + ", which is " + comparison + " the last seen epoch " + current + diff --git a/storage/src/test/java/org/apache/kafka/storage/internals/log/ProducerStateManagerTest.java b/storage/src/test/java/org/apache/kafka/storage/internals/log/ProducerStateManagerTest.java index 4d737de4ab40d..b504fb76636d7 100644 --- a/storage/src/test/java/org/apache/kafka/storage/internals/log/ProducerStateManagerTest.java +++ b/storage/src/test/java/org/apache/kafka/storage/internals/log/ProducerStateManagerTest.java @@ -1090,6 +1090,26 @@ public void testVerificationStateEntryExpiration() { assertNull(stateManager.verificationStateEntry(producerId)); } + @Test + public void testIdempotentTransactionMarkerRetryTV2() { + short transactionVersion = 2; + appendClientEntry(stateManager, producerId, epoch, defaultSequence, 99, true); + short markerEpoch = (short) (epoch + 1); + appendEndTxnMarker(stateManager, producerId, markerEpoch, ControlRecordType.COMMIT, 100, transactionVersion); + + ProducerStateEntry entry = getLastEntryOrElseThrownByProducerId(stateManager, producerId); + assertEquals(markerEpoch, entry.producerEpoch()); + assertEquals(OptionalLong.empty(), entry.currentTxnFirstOffset()); + + assertDoesNotThrow(() -> + appendEndTxnMarker(stateManager, producerId, markerEpoch, ControlRecordType.COMMIT, 101, transactionVersion) + ); + + ProducerStateEntry entryAfterRetry = getLastEntryOrElseThrownByProducerId(stateManager, producerId); + assertEquals(markerEpoch, entryAfterRetry.producerEpoch()); + assertEquals(OptionalLong.empty(), entryAfterRetry.currentTxnFirstOffset()); + } + @Test public void testRejectNonZeroSequenceForTransactionsV2WithEmptyState() { // Create a verification state entry that supports epoch bump (transactions v2)