Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,22 @@ private void checkProducerEpoch(short producerEpoch, long offset, short transact
boolean invalidEpoch = (transactionVersion >= 2) ? (producerEpoch <= current) : (producerEpoch < current);

if (invalidEpoch) {
// TV2 Idempotent Marker Retry Detection (KAFKA-19999):
// When markerEpoch == currentEpoch and no transaction is ongoing, this indicates
// a retry of a marker that was already successfully written. Common scenarios:
// 1. Coordinator recovery: reloading PREPARE_COMMIT/ABORT from transaction log
// 2. Network retry: marker was written but response was lost due to disconnection
// In both cases, the transaction has already ended (currentTxnFirstOffset is empty).
// We suppress the InvalidProducerEpochException and allow the duplicate marker to
// be written to the log.
if (transactionVersion >= 2 &&
producerEpoch == current &&
updatedEntry.currentTxnFirstOffset().isEmpty()) {
log.info("Idempotent transaction marker retry detected for producer {} epoch {}. " +
"Transaction already completed, allowing duplicate marker write.",
producerId, producerEpoch);
return;
}
String comparison = (transactionVersion >= 2) ? "<=" : "<";
String message = "Epoch of producer " + producerId + " at offset " + offset + " in " + topicPartition +
" is " + producerEpoch + ", which is " + comparison + " the last seen epoch " + current +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1090,6 +1090,26 @@ public void testVerificationStateEntryExpiration() {
assertNull(stateManager.verificationStateEntry(producerId));
}

@Test
public void testIdempotentTransactionMarkerRetryTV2() {
short transactionVersion = 2;
appendClientEntry(stateManager, producerId, epoch, defaultSequence, 99, true);
short markerEpoch = (short) (epoch + 1);
appendEndTxnMarker(stateManager, producerId, markerEpoch, ControlRecordType.COMMIT, 100, transactionVersion);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

small question -- why do we do this append in two different ways in the test? It seems like the behavior is mostly the same in this helper vs the code at 1106. It's just the code at 1106 doesn't do three of the steps at the end:

private void appendEndTxnMarker(ProducerStateManager stateManager,
                                    long producerId,
                                    short producerEpoch,
                                    ControlRecordType controlType,
                                    long offset,
                                    int coordinatorEpoch,
                                    long timestamp,
                                    short transactionVersion) {
        ProducerAppendInfo producerAppendInfo = stateManager.prepareUpdate(producerId, AppendOrigin.COORDINATOR, time.milliseconds());
        EndTransactionMarker endTxnMarker = new EndTransactionMarker(controlType, coordinatorEpoch);
        Optional<CompletedTxn> completedTxn = producerAppendInfo.appendEndTxnMarker(endTxnMarker, producerEpoch, offset, timestamp, transactionVersion);
        stateManager.update(producerAppendInfo);
        completedTxn.ifPresent(stateManager::completeTxn);
        stateManager.updateMapEndOffset(offset + 1);
    }

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing out! Updated to use the helper method consistently for both appends. This ensures the full cleanup steps are executed and makes the test clearer.


ProducerStateEntry entry = getLastEntryOrElseThrownByProducerId(stateManager, producerId);
assertEquals(markerEpoch, entry.producerEpoch());
assertEquals(OptionalLong.empty(), entry.currentTxnFirstOffset());

assertDoesNotThrow(() ->
appendEndTxnMarker(stateManager, producerId, markerEpoch, ControlRecordType.COMMIT, 101, transactionVersion)
);

ProducerStateEntry entryAfterRetry = getLastEntryOrElseThrownByProducerId(stateManager, producerId);
assertEquals(markerEpoch, entryAfterRetry.producerEpoch());
assertEquals(OptionalLong.empty(), entryAfterRetry.currentTxnFirstOffset());
}

@Test
public void testRejectNonZeroSequenceForTransactionsV2WithEmptyState() {
// Create a verification state entry that supports epoch bump (transactions v2)
Expand Down