From e651d260c698d25b4fe03a326a6de95a55f3cb9f Mon Sep 17 00:00:00 2001 From: m1a2st Date: Sat, 20 Dec 2025 00:46:34 +0800 Subject: [PATCH 1/7] temp version --- .../common/requests/ProduceResponse.java | 19 ++++++ .../group/CoordinatorPartitionWriter.scala | 4 +- .../main/scala/kafka/server/KafkaApis.scala | 34 +++++++---- .../scala/kafka/server/ReplicaManager.scala | 4 +- .../IdempotentTransactionMarkerException.java | 26 ++++++++ .../internals/log/ProducerAppendInfo.java | 21 ++++++- .../log/ProducerStateManagerTest.java | 60 +++++++++++++------ 7 files changed, 131 insertions(+), 37 deletions(-) create mode 100644 storage/src/main/java/org/apache/kafka/storage/internals/log/IdempotentTransactionMarkerException.java diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java index 673b91ac9ab20..3f5843b151a9f 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java @@ -30,6 +30,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.stream.Collectors; /** @@ -164,6 +165,7 @@ public static final class PartitionResponse { public List recordErrors; public String errorMessage; public ProduceResponseData.LeaderIdAndEpoch currentLeader; + private Optional exception; public PartitionResponse(Errors error) { this(error, INVALID_OFFSET, RecordBatch.NO_TIMESTAMP, INVALID_OFFSET); @@ -185,6 +187,19 @@ public PartitionResponse(Errors error, long baseOffset, long logAppendTime, long this(error, baseOffset, logAppendTime, logStartOffset, recordErrors, errorMessage, new ProduceResponseData.LeaderIdAndEpoch()); } + public PartitionResponse(Optional exception, long baseOffset, long logAppendTime, long logStartOffset, List recordErrors, String errorMessage) { + this( + exception.isEmpty() ? Errors.NONE : Errors.forException(exception.get()), + baseOffset, + logAppendTime, + logStartOffset, + recordErrors, + errorMessage, + new ProduceResponseData.LeaderIdAndEpoch() + ); + this.exception = exception; + } + public PartitionResponse( Errors error, long baseOffset, @@ -203,6 +218,10 @@ public PartitionResponse( this.currentLeader = currentLeader; } + public Throwable exception() { + return exception.orElse(error.exception(errorMessage)); + } + @Override public boolean equals(Object o) { if (this == o) return true; diff --git a/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala b/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala index 3293782152731..08ff9fa9b50ca 100644 --- a/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala +++ b/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala @@ -158,9 +158,7 @@ class CoordinatorPartitionWriter( val partitionResult = appendResults.getOrElse(topicIdPartition, throw new IllegalStateException(s"Append status $appendResults should have partition $tp.")) - if (partitionResult.error != Errors.NONE) { - throw partitionResult.error.exception() - } + partitionResult.exception.foreach(e => throw e) // Required offset. partitionResult.info.lastOffset + 1 diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index dc7194074edc4..6c1391760ac23 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -69,7 +69,7 @@ import org.apache.kafka.server.share.{ErroneousAndValidPartitionData, ShareParti import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, FetchPartitionData} import org.apache.kafka.server.transaction.AddPartitionsToTxnManager -import org.apache.kafka.storage.internals.log.AppendOrigin +import org.apache.kafka.storage.internals.log.{AppendOrigin, IdempotentTransactionMarkerException} import org.apache.kafka.storage.log.metrics.BrokerTopicStats import java.time.Duration @@ -1798,13 +1798,17 @@ class KafkaApis(val requestChannel: RequestChannel, val error = if (exception == null) { Errors.NONE } else { - Errors.forException(exception) match { - case Errors.COORDINATOR_NOT_AVAILABLE | Errors.COORDINATOR_LOAD_IN_PROGRESS | Errors.NOT_COORDINATOR => - // The transaction coordinator does not expect those errors so we translate them - // to NOT_LEADER_OR_FOLLOWER to signal to it that the coordinator is not ready yet. - Errors.NOT_LEADER_OR_FOLLOWER - case error => - error + if (Errors.maybeUnwrapException(exception).isInstanceOf[IdempotentTransactionMarkerException]) + Errors.NONE + else { + Errors.forException(exception) match { + case Errors.COORDINATOR_NOT_AVAILABLE | Errors.COORDINATOR_LOAD_IN_PROGRESS | Errors.NOT_COORDINATOR => + // The transaction coordinator does not expect those errors so we translate them + // to NOT_LEADER_OR_FOLLOWER to signal to it that the coordinator is not ready yet. + Errors.NOT_LEADER_OR_FOLLOWER + case error => + error + } } } addResultAndMaybeComplete(partition, error) @@ -1831,7 +1835,13 @@ class KafkaApis(val requestChannel: RequestChannel, requestLocal = requestLocal, responseCallback = errors => { errors.foreachEntry { (topicIdPartition, partitionResponse) => - addResultAndMaybeComplete(topicIdPartition.topicPartition(), partitionResponse.error) + val error = if (partitionResponse.error == Errors.NONE) + Errors.NONE + else if (partitionResponse.exception().isInstanceOf[IdempotentTransactionMarkerException]) + Errors.NONE + else + partitionResponse.error + addResultAndMaybeComplete(topicIdPartition.topicPartition(), error) } }, transactionVersion = markerTransactionVersion @@ -2825,11 +2835,11 @@ class KafkaApis(val requestChannel: RequestChannel, val timeoutMs = heartbeatIntervalMs * 2 autoTopicCreationManager.createStreamsInternalTopics(topicsToCreate, requestContext, timeoutMs) - + // Check for cached topic creation errors only if there's already a MISSING_INTERNAL_TOPICS status - val hasMissingInternalTopicsStatus = responseData.status() != null && + val hasMissingInternalTopicsStatus = responseData.status() != null && responseData.status().stream().anyMatch(s => s.statusCode() == StreamsGroupHeartbeatResponse.Status.MISSING_INTERNAL_TOPICS.code()) - + if (hasMissingInternalTopicsStatus) { val currentTimeMs = time.milliseconds() val cachedErrors = autoTopicCreationManager.getStreamsInternalTopicCreationErrors(topicsToCreate.keys.toSet, currentTimeMs) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 4ee24f2e41428..fdc5fc8c8e5e0 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -79,7 +79,7 @@ import java.util.{Collections, Optional, OptionalInt, OptionalLong} import java.util.function.Consumer import scala.collection.{Map, Seq, Set, immutable, mutable} import scala.jdk.CollectionConverters._ -import scala.jdk.OptionConverters.RichOptional +import scala.jdk.OptionConverters.{RichOption, RichOptional} /* * Result metadata of a log append operation on the log @@ -881,7 +881,7 @@ class ReplicaManager(val config: KafkaConfig, topicIdPartition -> ProducePartitionStatus( result.info.lastOffset + 1, // required offset new PartitionResponse( - result.error, + result.exception.toJava, result.info.firstOffset, result.info.logAppendTime, result.info.logStartOffset, diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/IdempotentTransactionMarkerException.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/IdempotentTransactionMarkerException.java new file mode 100644 index 0000000000000..b8751d45d7618 --- /dev/null +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/IdempotentTransactionMarkerException.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import org.apache.kafka.common.KafkaException; + +public class IdempotentTransactionMarkerException extends KafkaException { + + public IdempotentTransactionMarkerException() { + super(); + } +} diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerAppendInfo.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerAppendInfo.java index 26444c07793c7..e36ab957001b5 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerAppendInfo.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerAppendInfo.java @@ -98,18 +98,18 @@ private void maybeValidateDataBatch(short producerEpoch, int firstSeq, long offs /** * Validates the producer epoch for transaction markers based on the transaction version. - * + * *

For Transaction Version 2 (TV2) and above, the coordinator always increments * the producer epoch by one before writing the final transaction marker. This establishes a * clear invariant: a valid TV2 marker must have an epoch strictly greater than the producer's * current epoch at the leader. Any marker with markerEpoch <= currentEpoch is a late or duplicate * marker and must be rejected to prevent conflating multiple transactions under the same epoch, * which would threaten exactly-once semantics (EOS) guarantees. - * + * *

For legacy transaction versions (TV0/TV1), markers were written with the same epoch as * the transactional records, so we accept markers when markerEpoch >= currentEpoch. This * preserves backward compatibility but cannot distinguish between active and stale markers. - * + * * @param producerEpoch the epoch from the transaction marker * @param offset the offset where the marker will be written * @param transactionVersion the transaction version (0/1 = legacy, 2 = TV2) @@ -119,6 +119,21 @@ private void checkProducerEpoch(short producerEpoch, long offset, short transact boolean invalidEpoch = (transactionVersion >= 2) ? (producerEpoch <= current) : (producerEpoch < current); if (invalidEpoch) { + // TV2 Idempotent Retry Detection: + // When markerEpoch == currentEpoch and no transaction is ongoing, this is a retry + // of a marker that was already successfully written. Common scenarios: + // 1. Coordinator recovery: reloading PREPARE_COMMIT/ABORT from transaction log + // 2. Network retry: marker was written but response was lost due to disconnection + // In both cases, the transaction has already ended (currentTxnFirstOffset is empty), + // so we can safely treat this as idempotent success. + if (transactionVersion >= 2 && + producerEpoch == current && + updatedEntry.currentTxnFirstOffset().isEmpty()) { + log.debug("Received duplicate transaction marker for producer {} with epoch {} " + + "but transaction is no longer ongoing, treating as idempotent success", + producerId, producerEpoch); + throw new IdempotentTransactionMarkerException(); + } String comparison = (transactionVersion >= 2) ? "<=" : "<"; String message = "Epoch of producer " + producerId + " at offset " + offset + " in " + topicPartition + " is " + producerEpoch + ", which is " + comparison + " the last seen epoch " + current + diff --git a/storage/src/test/java/org/apache/kafka/storage/internals/log/ProducerStateManagerTest.java b/storage/src/test/java/org/apache/kafka/storage/internals/log/ProducerStateManagerTest.java index 4d737de4ab40d..9b19352c5a393 100644 --- a/storage/src/test/java/org/apache/kafka/storage/internals/log/ProducerStateManagerTest.java +++ b/storage/src/test/java/org/apache/kafka/storage/internals/log/ProducerStateManagerTest.java @@ -148,7 +148,7 @@ public void testProducerSequenceWrapAround() { short epoch = 15; int sequence = Integer.MAX_VALUE; long offset = 735L; - + appendReplicationEntry(stateManager, epoch, sequence, offset); appendClientEntry(stateManager, producerId, epoch, 0, offset + 500, false); @@ -238,7 +238,7 @@ public void testTxnFirstOffsetMetadataCached() { stateManager.maybeCreateVerificationStateEntry(producerId, defaultSequence, epoch, true)); LogOffsetMetadata firstOffsetMetadata = new LogOffsetMetadata(offset, 990000L, 234224); - appendInfo.appendDataBatch(epoch, defaultSequence, defaultSequence, + appendInfo.appendDataBatch(epoch, defaultSequence, defaultSequence, time.milliseconds(), firstOffsetMetadata, offset, true); stateManager.update(appendInfo); @@ -791,7 +791,7 @@ public void testFirstUnstableOffset(short transactionVersion) { @Test public void testProducersWithOngoingTransactionsDontExpire() { short epoch = 5; - + appendClientEntry(stateManager, producerId, epoch, defaultSequence, 99, true); assertEquals(OptionalLong.of(99L), stateManager.firstUndecidedOffset()); @@ -831,7 +831,7 @@ public void testOldEpochForControlRecord(short transactionVersion) { assertThrows(InvalidProducerEpochException.class, () -> appendEndTxnMarker(stateManager, producerId, (short) 3, ControlRecordType.COMMIT, 100, transactionVersion)); - + // For TV2, same epoch should also be rejected (requires strict >) if (transactionVersion >= 2) { assertThrows(InvalidProducerEpochException.class, @@ -1090,6 +1090,32 @@ public void testVerificationStateEntryExpiration() { assertNull(stateManager.verificationStateEntry(producerId)); } + @Test + public void testIdempotentTransactionMarkerExceptionThrownTV2() { + short transactionVersion = 2; + appendClientEntry(stateManager, producerId, epoch, defaultSequence, 99, true); + assertEquals(OptionalLong.of(99L), stateManager.firstUndecidedOffset()); + + short markerEpoch = (short) (epoch + 1); + appendEndTxnMarker(stateManager, producerId, markerEpoch, ControlRecordType.COMMIT, 100, transactionVersion); + + ProducerStateEntry entry = getLastEntryOrElseThrownByProducerId(stateManager, producerId); + assertEquals(markerEpoch, entry.producerEpoch()); + assertEquals(OptionalLong.empty(), entry.currentTxnFirstOffset()); + + ProducerAppendInfo appendInfo = stateManager.prepareUpdate(producerId, AppendOrigin.COORDINATOR); + EndTransactionMarker endTxnMarker = new EndTransactionMarker(ControlRecordType.COMMIT, 0); + + // This should throw IdempotentTransactionMarkerException because: + // 1. markerEpoch (1) == currentEpoch (1) + // 2. currentTxnFirstOffset is empty (transaction completed) + assertThrows( + IdempotentTransactionMarkerException.class, + () -> appendInfo.appendEndTxnMarker(endTxnMarker, markerEpoch, 101, time.milliseconds(), transactionVersion) + ); + } + + @Test public void testRejectNonZeroSequenceForTransactionsV2WithEmptyState() { // Create a verification state entry that supports epoch bump (transactions v2) @@ -1099,13 +1125,13 @@ public void testRejectNonZeroSequenceForTransactionsV2WithEmptyState() { epoch, true ); - + // Verify this is actually transactions v2 assertTrue( verificationEntry.supportsEpochBump(), "Should be using transactions v2 (supports epoch bump)" ); - + // Create ProducerAppendInfo with empty producer state ProducerAppendInfo appendInfo = new ProducerAppendInfo( partition, @@ -1114,7 +1140,7 @@ public void testRejectNonZeroSequenceForTransactionsV2WithEmptyState() { AppendOrigin.CLIENT, verificationEntry ); - + // Attempting to append with non-zero sequence number should fail for transactions v2 OutOfOrderSequenceException exception = assertThrows( OutOfOrderSequenceException.class, @@ -1126,12 +1152,12 @@ public void testRejectNonZeroSequenceForTransactionsV2WithEmptyState() { new LogOffsetMetadata(0L), 0L, false ) ); - + assertTrue(exception.getMessage().contains("Expected sequence 0 for " + "transactions v2 idempotent producer" )); assertTrue(exception.getMessage().contains("5 (incoming seq. number)")); - + // Attempting to append with sequence 0 should succeed assertDoesNotThrow(() -> appendInfo.appendDataBatch( epoch, @@ -1152,13 +1178,13 @@ public void testAllowNonZeroSequenceForTransactionsV1WithEmptyState() { epoch, false ); - + // Verify this is transactions v1 assertFalse( verificationEntry.supportsEpochBump(), "Should be using transactions v1 (does not support epoch bump)" ); - + // Create ProducerAppendInfo with empty producer state ProducerAppendInfo appendInfo = new ProducerAppendInfo( partition, @@ -1167,7 +1193,7 @@ public void testAllowNonZeroSequenceForTransactionsV1WithEmptyState() { AppendOrigin.CLIENT, verificationEntry ); - + // Attempting to append with non-zero sequence number should succeed for transactions v1 // (our validation should not trigger) assertDoesNotThrow(() -> appendInfo.appendDataBatch( @@ -1185,7 +1211,7 @@ public void testRejectNonZeroSequenceForDirectEpochBump() { appendClientEntry(stateManager, producerId, epoch, 0, 0L, false); appendClientEntry(stateManager, producerId, epoch, 1, 1L, false); appendClientEntry(stateManager, producerId, epoch, 2, 2L, false); - + // Verify initial state ProducerStateEntry initialEntry = getLastEntryOrElseThrownByProducerId(stateManager, producerId); assertEquals(0, initialEntry.producerEpoch()); @@ -1193,7 +1219,7 @@ public void testRejectNonZeroSequenceForDirectEpochBump() { assertFalse(initialEntry.isEmpty()); // Has batch metadata ProducerAppendInfo appendInfo = stateManager.prepareUpdate(producerId, AppendOrigin.CLIENT); - + // Test Case 1: Epoch bump (0 -> 1) with non-zero sequence should be rejected OutOfOrderSequenceException exception = assertThrows(OutOfOrderSequenceException.class, () -> appendInfo.appendDataBatch( @@ -1203,12 +1229,12 @@ public void testRejectNonZeroSequenceForDirectEpochBump() { time.milliseconds(), new LogOffsetMetadata(3L), 3L, false) ); - + assertTrue(exception.getMessage().contains("Invalid sequence number for new epoch")); assertTrue(exception.getMessage().contains("1 (request epoch)")); assertTrue(exception.getMessage().contains("5 (seq. number)")); assertTrue(exception.getMessage().contains("0 (current producer epoch)")); - + // Test Case 2: Epoch bump (0 -> 1) with sequence 0 should succeed ProducerAppendInfo appendInfo2 = stateManager.prepareUpdate(producerId, AppendOrigin.CLIENT); assertDoesNotThrow(() -> appendInfo2.appendDataBatch( @@ -1398,7 +1424,7 @@ private void appendEndTxnMarker(ProducerStateManager stateManager, * Calculates the marker epoch for a transaction marker based on the transaction version. * For TV2 and above, the coordinator bumps the epoch (markerEpoch = currentEpoch + 1). * For TV0/TV1, the marker uses the same epoch as transactional records (markerEpoch = currentEpoch). - * + * * @param currentEpoch the current producer epoch * @param transactionVersion the transaction version (1 = TV1, 2 = TV2, etc.) * @return the marker epoch to use From 6189711a7b34d81d8468400502ed4081ba702639 Mon Sep 17 00:00:00 2001 From: m1a2st Date: Sat, 20 Dec 2025 14:37:44 +0800 Subject: [PATCH 2/7] addressed by comments --- .../kafka/common/requests/ProduceResponse.java | 12 +++++++----- .../main/scala/kafka/server/KafkaApis.scala | 9 +++++++-- .../scala/kafka/server/ReplicaManager.scala | 4 ++-- .../IdempotentTransactionMarkerException.java | 18 ++++++++++++++++++ 4 files changed, 34 insertions(+), 9 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java index 3f5843b151a9f..217d40202c4de 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java @@ -30,7 +30,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Optional; import java.util.stream.Collectors; /** @@ -165,7 +164,7 @@ public static final class PartitionResponse { public List recordErrors; public String errorMessage; public ProduceResponseData.LeaderIdAndEpoch currentLeader; - private Optional exception; + private Throwable exception; public PartitionResponse(Errors error) { this(error, INVALID_OFFSET, RecordBatch.NO_TIMESTAMP, INVALID_OFFSET); @@ -187,9 +186,9 @@ public PartitionResponse(Errors error, long baseOffset, long logAppendTime, long this(error, baseOffset, logAppendTime, logStartOffset, recordErrors, errorMessage, new ProduceResponseData.LeaderIdAndEpoch()); } - public PartitionResponse(Optional exception, long baseOffset, long logAppendTime, long logStartOffset, List recordErrors, String errorMessage) { + public PartitionResponse(Throwable exception, long baseOffset, long logAppendTime, long logStartOffset, List recordErrors, String errorMessage) { this( - exception.isEmpty() ? Errors.NONE : Errors.forException(exception.get()), + exception == null ? Errors.NONE : Errors.forException(exception), baseOffset, logAppendTime, logStartOffset, @@ -219,7 +218,10 @@ public PartitionResponse( } public Throwable exception() { - return exception.orElse(error.exception(errorMessage)); + if (exception != null) { + return exception; + } + return error.exception(errorMessage); } @Override diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 6c1391760ac23..ed512918256ca 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -1798,7 +1798,7 @@ class KafkaApis(val requestChannel: RequestChannel, val error = if (exception == null) { Errors.NONE } else { - if (Errors.maybeUnwrapException(exception).isInstanceOf[IdempotentTransactionMarkerException]) + if (IdempotentTransactionMarkerException.isInstanceOf(exception)) Errors.NONE else { Errors.forException(exception) match { @@ -1837,7 +1837,12 @@ class KafkaApis(val requestChannel: RequestChannel, errors.foreachEntry { (topicIdPartition, partitionResponse) => val error = if (partitionResponse.error == Errors.NONE) Errors.NONE - else if (partitionResponse.exception().isInstanceOf[IdempotentTransactionMarkerException]) + // Handle idempotent transaction marker retries (KAFKA-19999): + // For TV2, when a marker with the same epoch arrives and no transaction is ongoing, + // ProducerStateManager throws IdempotentTransactionMarkerException to signal this is + // a benign retry (e.g., coordinator recovery or network disconnection). We treat this + // as success to prevent hanging transactions. + else if (IdempotentTransactionMarkerException.isInstanceOf(partitionResponse.exception())) Errors.NONE else partitionResponse.error diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index fdc5fc8c8e5e0..b41c9c4197e11 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -79,7 +79,7 @@ import java.util.{Collections, Optional, OptionalInt, OptionalLong} import java.util.function.Consumer import scala.collection.{Map, Seq, Set, immutable, mutable} import scala.jdk.CollectionConverters._ -import scala.jdk.OptionConverters.{RichOption, RichOptional} +import scala.jdk.OptionConverters.RichOptional /* * Result metadata of a log append operation on the log @@ -881,7 +881,7 @@ class ReplicaManager(val config: KafkaConfig, topicIdPartition -> ProducePartitionStatus( result.info.lastOffset + 1, // required offset new PartitionResponse( - result.exception.toJava, + result.exception.orNull, result.info.firstOffset, result.info.logAppendTime, result.info.logStartOffset, diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/IdempotentTransactionMarkerException.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/IdempotentTransactionMarkerException.java index b8751d45d7618..0dad017effbf6 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/IdempotentTransactionMarkerException.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/IdempotentTransactionMarkerException.java @@ -17,10 +17,28 @@ package org.apache.kafka.storage.internals.log; import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.protocol.Errors; +/** + * Indicates that a transaction marker was received as part of an idempotent retry + * and should be treated as a successful no-op rather than an error. + * + *

This exception is thrown when: + *

    + *
  • A TV2 transaction marker arrives with the same epoch as current
  • + *
  • No transaction is currently ongoing (currentTxnFirstOffset is empty)
  • + *
+ * + *

Common scenarios include coordinator recovery and network-induced retries. + * Callers should catch this exception and treat it as a successful operation. + */ public class IdempotentTransactionMarkerException extends KafkaException { public IdempotentTransactionMarkerException() { super(); } + + public static boolean isInstanceOf(Throwable t) { + return Errors.maybeUnwrapException(t) instanceof IdempotentTransactionMarkerException; + } } From cebf149cf43a3f4a53fccda86b21388e03efc705 Mon Sep 17 00:00:00 2001 From: m1a2st Date: Mon, 22 Dec 2025 00:10:31 +0800 Subject: [PATCH 3/7] addressed by comment --- .../common/requests/ProduceResponse.java | 11 ++++-- .../main/scala/kafka/server/KafkaApis.scala | 4 +++ .../internals/log/ProducerAppendInfo.java | 2 +- .../log/ProducerStateManagerTest.java | 35 +++++++++---------- 4 files changed, 31 insertions(+), 21 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java index 217d40202c4de..248d9057a14bb 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java @@ -235,12 +235,13 @@ public boolean equals(Object o) { error == that.error && Objects.equals(recordErrors, that.recordErrors) && Objects.equals(errorMessage, that.errorMessage) && - Objects.equals(currentLeader, that.currentLeader); + Objects.equals(currentLeader, that.currentLeader) && + Objects.equals(exception, that.exception); } @Override public int hashCode() { - return Objects.hash(error, baseOffset, logAppendTime, logStartOffset, recordErrors, errorMessage, currentLeader); + return Objects.hash(error, baseOffset, logAppendTime, logStartOffset, recordErrors, errorMessage, currentLeader, exception); } @Override @@ -265,6 +266,12 @@ public String toString() { } else { b.append("null"); } + b.append("exception: "); + if (exception != null) { + b.append(exception); + } else { + b.append("null"); + } b.append('}'); return b.toString(); } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index ed512918256ca..bd457ad181e6e 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -1798,6 +1798,10 @@ class KafkaApis(val requestChannel: RequestChannel, val error = if (exception == null) { Errors.NONE } else { + // Handle idempotent transaction marker retries (KAFKA-19999): + // The group coordinator may throw IdempotentTransactionMarkerException when detecting a marker retry + // (same epoch + transaction already completed). Treat this as success to prevent hanging + // transactions during coordinator recovery or network retries if (IdempotentTransactionMarkerException.isInstanceOf(exception)) Errors.NONE else { diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerAppendInfo.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerAppendInfo.java index e36ab957001b5..185edf10fd18f 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerAppendInfo.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerAppendInfo.java @@ -122,7 +122,7 @@ private void checkProducerEpoch(short producerEpoch, long offset, short transact // TV2 Idempotent Retry Detection: // When markerEpoch == currentEpoch and no transaction is ongoing, this is a retry // of a marker that was already successfully written. Common scenarios: - // 1. Coordinator recovery: reloading PREPARE_COMMIT/ABORT from transaction log + // 1. Coordinator recovery: reloading PREPARE_COMMIT/ABORT from the transaction log // 2. Network retry: marker was written but response was lost due to disconnection // In both cases, the transaction has already ended (currentTxnFirstOffset is empty), // so we can safely treat this as idempotent success. diff --git a/storage/src/test/java/org/apache/kafka/storage/internals/log/ProducerStateManagerTest.java b/storage/src/test/java/org/apache/kafka/storage/internals/log/ProducerStateManagerTest.java index 9b19352c5a393..9a57404196b4c 100644 --- a/storage/src/test/java/org/apache/kafka/storage/internals/log/ProducerStateManagerTest.java +++ b/storage/src/test/java/org/apache/kafka/storage/internals/log/ProducerStateManagerTest.java @@ -148,7 +148,7 @@ public void testProducerSequenceWrapAround() { short epoch = 15; int sequence = Integer.MAX_VALUE; long offset = 735L; - + appendReplicationEntry(stateManager, epoch, sequence, offset); appendClientEntry(stateManager, producerId, epoch, 0, offset + 500, false); @@ -238,7 +238,7 @@ public void testTxnFirstOffsetMetadataCached() { stateManager.maybeCreateVerificationStateEntry(producerId, defaultSequence, epoch, true)); LogOffsetMetadata firstOffsetMetadata = new LogOffsetMetadata(offset, 990000L, 234224); - appendInfo.appendDataBatch(epoch, defaultSequence, defaultSequence, + appendInfo.appendDataBatch(epoch, defaultSequence, defaultSequence, time.milliseconds(), firstOffsetMetadata, offset, true); stateManager.update(appendInfo); @@ -791,7 +791,7 @@ public void testFirstUnstableOffset(short transactionVersion) { @Test public void testProducersWithOngoingTransactionsDontExpire() { short epoch = 5; - + appendClientEntry(stateManager, producerId, epoch, defaultSequence, 99, true); assertEquals(OptionalLong.of(99L), stateManager.firstUndecidedOffset()); @@ -831,7 +831,7 @@ public void testOldEpochForControlRecord(short transactionVersion) { assertThrows(InvalidProducerEpochException.class, () -> appendEndTxnMarker(stateManager, producerId, (short) 3, ControlRecordType.COMMIT, 100, transactionVersion)); - + // For TV2, same epoch should also be rejected (requires strict >) if (transactionVersion >= 2) { assertThrows(InvalidProducerEpochException.class, @@ -1115,7 +1115,6 @@ public void testIdempotentTransactionMarkerExceptionThrownTV2() { ); } - @Test public void testRejectNonZeroSequenceForTransactionsV2WithEmptyState() { // Create a verification state entry that supports epoch bump (transactions v2) @@ -1125,13 +1124,13 @@ public void testRejectNonZeroSequenceForTransactionsV2WithEmptyState() { epoch, true ); - + // Verify this is actually transactions v2 assertTrue( verificationEntry.supportsEpochBump(), "Should be using transactions v2 (supports epoch bump)" ); - + // Create ProducerAppendInfo with empty producer state ProducerAppendInfo appendInfo = new ProducerAppendInfo( partition, @@ -1140,7 +1139,7 @@ public void testRejectNonZeroSequenceForTransactionsV2WithEmptyState() { AppendOrigin.CLIENT, verificationEntry ); - + // Attempting to append with non-zero sequence number should fail for transactions v2 OutOfOrderSequenceException exception = assertThrows( OutOfOrderSequenceException.class, @@ -1152,12 +1151,12 @@ public void testRejectNonZeroSequenceForTransactionsV2WithEmptyState() { new LogOffsetMetadata(0L), 0L, false ) ); - + assertTrue(exception.getMessage().contains("Expected sequence 0 for " + "transactions v2 idempotent producer" )); assertTrue(exception.getMessage().contains("5 (incoming seq. number)")); - + // Attempting to append with sequence 0 should succeed assertDoesNotThrow(() -> appendInfo.appendDataBatch( epoch, @@ -1178,13 +1177,13 @@ public void testAllowNonZeroSequenceForTransactionsV1WithEmptyState() { epoch, false ); - + // Verify this is transactions v1 assertFalse( verificationEntry.supportsEpochBump(), "Should be using transactions v1 (does not support epoch bump)" ); - + // Create ProducerAppendInfo with empty producer state ProducerAppendInfo appendInfo = new ProducerAppendInfo( partition, @@ -1193,7 +1192,7 @@ public void testAllowNonZeroSequenceForTransactionsV1WithEmptyState() { AppendOrigin.CLIENT, verificationEntry ); - + // Attempting to append with non-zero sequence number should succeed for transactions v1 // (our validation should not trigger) assertDoesNotThrow(() -> appendInfo.appendDataBatch( @@ -1211,7 +1210,7 @@ public void testRejectNonZeroSequenceForDirectEpochBump() { appendClientEntry(stateManager, producerId, epoch, 0, 0L, false); appendClientEntry(stateManager, producerId, epoch, 1, 1L, false); appendClientEntry(stateManager, producerId, epoch, 2, 2L, false); - + // Verify initial state ProducerStateEntry initialEntry = getLastEntryOrElseThrownByProducerId(stateManager, producerId); assertEquals(0, initialEntry.producerEpoch()); @@ -1219,7 +1218,7 @@ public void testRejectNonZeroSequenceForDirectEpochBump() { assertFalse(initialEntry.isEmpty()); // Has batch metadata ProducerAppendInfo appendInfo = stateManager.prepareUpdate(producerId, AppendOrigin.CLIENT); - + // Test Case 1: Epoch bump (0 -> 1) with non-zero sequence should be rejected OutOfOrderSequenceException exception = assertThrows(OutOfOrderSequenceException.class, () -> appendInfo.appendDataBatch( @@ -1229,12 +1228,12 @@ public void testRejectNonZeroSequenceForDirectEpochBump() { time.milliseconds(), new LogOffsetMetadata(3L), 3L, false) ); - + assertTrue(exception.getMessage().contains("Invalid sequence number for new epoch")); assertTrue(exception.getMessage().contains("1 (request epoch)")); assertTrue(exception.getMessage().contains("5 (seq. number)")); assertTrue(exception.getMessage().contains("0 (current producer epoch)")); - + // Test Case 2: Epoch bump (0 -> 1) with sequence 0 should succeed ProducerAppendInfo appendInfo2 = stateManager.prepareUpdate(producerId, AppendOrigin.CLIENT); assertDoesNotThrow(() -> appendInfo2.appendDataBatch( @@ -1424,7 +1423,7 @@ private void appendEndTxnMarker(ProducerStateManager stateManager, * Calculates the marker epoch for a transaction marker based on the transaction version. * For TV2 and above, the coordinator bumps the epoch (markerEpoch = currentEpoch + 1). * For TV0/TV1, the marker uses the same epoch as transactional records (markerEpoch = currentEpoch). - * + * * @param currentEpoch the current producer epoch * @param transactionVersion the transaction version (1 = TV1, 2 = TV2, etc.) * @return the marker epoch to use From c667152bf0f609a9cda0ac7c8f1d46fd25a2816e Mon Sep 17 00:00:00 2001 From: m1a2st Date: Mon, 22 Dec 2025 00:13:29 +0800 Subject: [PATCH 4/7] revert the space --- core/src/main/scala/kafka/server/KafkaApis.scala | 6 +++--- .../kafka/storage/internals/log/ProducerAppendInfo.java | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index bd457ad181e6e..471d632a4e789 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -2844,11 +2844,11 @@ class KafkaApis(val requestChannel: RequestChannel, val timeoutMs = heartbeatIntervalMs * 2 autoTopicCreationManager.createStreamsInternalTopics(topicsToCreate, requestContext, timeoutMs) - + // Check for cached topic creation errors only if there's already a MISSING_INTERNAL_TOPICS status - val hasMissingInternalTopicsStatus = responseData.status() != null && + val hasMissingInternalTopicsStatus = responseData.status() != null && responseData.status().stream().anyMatch(s => s.statusCode() == StreamsGroupHeartbeatResponse.Status.MISSING_INTERNAL_TOPICS.code()) - + if (hasMissingInternalTopicsStatus) { val currentTimeMs = time.milliseconds() val cachedErrors = autoTopicCreationManager.getStreamsInternalTopicCreationErrors(topicsToCreate.keys.toSet, currentTimeMs) diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerAppendInfo.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerAppendInfo.java index 185edf10fd18f..47df43a9026a5 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerAppendInfo.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerAppendInfo.java @@ -98,18 +98,18 @@ private void maybeValidateDataBatch(short producerEpoch, int firstSeq, long offs /** * Validates the producer epoch for transaction markers based on the transaction version. - * + * *

For Transaction Version 2 (TV2) and above, the coordinator always increments * the producer epoch by one before writing the final transaction marker. This establishes a * clear invariant: a valid TV2 marker must have an epoch strictly greater than the producer's * current epoch at the leader. Any marker with markerEpoch <= currentEpoch is a late or duplicate * marker and must be rejected to prevent conflating multiple transactions under the same epoch, * which would threaten exactly-once semantics (EOS) guarantees. - * + * *

For legacy transaction versions (TV0/TV1), markers were written with the same epoch as * the transactional records, so we accept markers when markerEpoch >= currentEpoch. This * preserves backward compatibility but cannot distinguish between active and stale markers. - * + * * @param producerEpoch the epoch from the transaction marker * @param offset the offset where the marker will be written * @param transactionVersion the transaction version (0/1 = legacy, 2 = TV2) From 6ed3a4c8b5562673ccadbf3da33054eab529c907 Mon Sep 17 00:00:00 2001 From: m1a2st Date: Wed, 24 Dec 2025 08:58:28 +0800 Subject: [PATCH 5/7] addressed by comments --- .../common/requests/ProduceResponse.java | 32 +--------------- .../group/CoordinatorPartitionWriter.scala | 4 +- .../main/scala/kafka/server/KafkaApis.scala | 37 +++++-------------- .../scala/kafka/server/ReplicaManager.scala | 2 +- .../internals/log/ProducerAppendInfo.java | 19 +++++----- .../log/ProducerStateManagerTest.java | 6 +-- 6 files changed, 26 insertions(+), 74 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java index 248d9057a14bb..673b91ac9ab20 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java @@ -164,7 +164,6 @@ public static final class PartitionResponse { public List recordErrors; public String errorMessage; public ProduceResponseData.LeaderIdAndEpoch currentLeader; - private Throwable exception; public PartitionResponse(Errors error) { this(error, INVALID_OFFSET, RecordBatch.NO_TIMESTAMP, INVALID_OFFSET); @@ -186,19 +185,6 @@ public PartitionResponse(Errors error, long baseOffset, long logAppendTime, long this(error, baseOffset, logAppendTime, logStartOffset, recordErrors, errorMessage, new ProduceResponseData.LeaderIdAndEpoch()); } - public PartitionResponse(Throwable exception, long baseOffset, long logAppendTime, long logStartOffset, List recordErrors, String errorMessage) { - this( - exception == null ? Errors.NONE : Errors.forException(exception), - baseOffset, - logAppendTime, - logStartOffset, - recordErrors, - errorMessage, - new ProduceResponseData.LeaderIdAndEpoch() - ); - this.exception = exception; - } - public PartitionResponse( Errors error, long baseOffset, @@ -217,13 +203,6 @@ public PartitionResponse( this.currentLeader = currentLeader; } - public Throwable exception() { - if (exception != null) { - return exception; - } - return error.exception(errorMessage); - } - @Override public boolean equals(Object o) { if (this == o) return true; @@ -235,13 +214,12 @@ public boolean equals(Object o) { error == that.error && Objects.equals(recordErrors, that.recordErrors) && Objects.equals(errorMessage, that.errorMessage) && - Objects.equals(currentLeader, that.currentLeader) && - Objects.equals(exception, that.exception); + Objects.equals(currentLeader, that.currentLeader); } @Override public int hashCode() { - return Objects.hash(error, baseOffset, logAppendTime, logStartOffset, recordErrors, errorMessage, currentLeader, exception); + return Objects.hash(error, baseOffset, logAppendTime, logStartOffset, recordErrors, errorMessage, currentLeader); } @Override @@ -266,12 +244,6 @@ public String toString() { } else { b.append("null"); } - b.append("exception: "); - if (exception != null) { - b.append(exception); - } else { - b.append("null"); - } b.append('}'); return b.toString(); } diff --git a/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala b/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala index 08ff9fa9b50ca..3293782152731 100644 --- a/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala +++ b/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala @@ -158,7 +158,9 @@ class CoordinatorPartitionWriter( val partitionResult = appendResults.getOrElse(topicIdPartition, throw new IllegalStateException(s"Append status $appendResults should have partition $tp.")) - partitionResult.exception.foreach(e => throw e) + if (partitionResult.error != Errors.NONE) { + throw partitionResult.error.exception() + } // Required offset. partitionResult.info.lastOffset + 1 diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 471d632a4e789..dc7194074edc4 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -69,7 +69,7 @@ import org.apache.kafka.server.share.{ErroneousAndValidPartitionData, ShareParti import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, FetchPartitionData} import org.apache.kafka.server.transaction.AddPartitionsToTxnManager -import org.apache.kafka.storage.internals.log.{AppendOrigin, IdempotentTransactionMarkerException} +import org.apache.kafka.storage.internals.log.AppendOrigin import org.apache.kafka.storage.log.metrics.BrokerTopicStats import java.time.Duration @@ -1798,21 +1798,13 @@ class KafkaApis(val requestChannel: RequestChannel, val error = if (exception == null) { Errors.NONE } else { - // Handle idempotent transaction marker retries (KAFKA-19999): - // The group coordinator may throw IdempotentTransactionMarkerException when detecting a marker retry - // (same epoch + transaction already completed). Treat this as success to prevent hanging - // transactions during coordinator recovery or network retries - if (IdempotentTransactionMarkerException.isInstanceOf(exception)) - Errors.NONE - else { - Errors.forException(exception) match { - case Errors.COORDINATOR_NOT_AVAILABLE | Errors.COORDINATOR_LOAD_IN_PROGRESS | Errors.NOT_COORDINATOR => - // The transaction coordinator does not expect those errors so we translate them - // to NOT_LEADER_OR_FOLLOWER to signal to it that the coordinator is not ready yet. - Errors.NOT_LEADER_OR_FOLLOWER - case error => - error - } + Errors.forException(exception) match { + case Errors.COORDINATOR_NOT_AVAILABLE | Errors.COORDINATOR_LOAD_IN_PROGRESS | Errors.NOT_COORDINATOR => + // The transaction coordinator does not expect those errors so we translate them + // to NOT_LEADER_OR_FOLLOWER to signal to it that the coordinator is not ready yet. + Errors.NOT_LEADER_OR_FOLLOWER + case error => + error } } addResultAndMaybeComplete(partition, error) @@ -1839,18 +1831,7 @@ class KafkaApis(val requestChannel: RequestChannel, requestLocal = requestLocal, responseCallback = errors => { errors.foreachEntry { (topicIdPartition, partitionResponse) => - val error = if (partitionResponse.error == Errors.NONE) - Errors.NONE - // Handle idempotent transaction marker retries (KAFKA-19999): - // For TV2, when a marker with the same epoch arrives and no transaction is ongoing, - // ProducerStateManager throws IdempotentTransactionMarkerException to signal this is - // a benign retry (e.g., coordinator recovery or network disconnection). We treat this - // as success to prevent hanging transactions. - else if (IdempotentTransactionMarkerException.isInstanceOf(partitionResponse.exception())) - Errors.NONE - else - partitionResponse.error - addResultAndMaybeComplete(topicIdPartition.topicPartition(), error) + addResultAndMaybeComplete(topicIdPartition.topicPartition(), partitionResponse.error) } }, transactionVersion = markerTransactionVersion diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index b41c9c4197e11..4ee24f2e41428 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -881,7 +881,7 @@ class ReplicaManager(val config: KafkaConfig, topicIdPartition -> ProducePartitionStatus( result.info.lastOffset + 1, // required offset new PartitionResponse( - result.exception.orNull, + result.error, result.info.firstOffset, result.info.logAppendTime, result.info.logStartOffset, diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerAppendInfo.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerAppendInfo.java index 47df43a9026a5..687d3a5522729 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerAppendInfo.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerAppendInfo.java @@ -119,20 +119,21 @@ private void checkProducerEpoch(short producerEpoch, long offset, short transact boolean invalidEpoch = (transactionVersion >= 2) ? (producerEpoch <= current) : (producerEpoch < current); if (invalidEpoch) { - // TV2 Idempotent Retry Detection: - // When markerEpoch == currentEpoch and no transaction is ongoing, this is a retry - // of a marker that was already successfully written. Common scenarios: - // 1. Coordinator recovery: reloading PREPARE_COMMIT/ABORT from the transaction log + // TV2 Idempotent Marker Retry Detection (KAFKA-19999): + // When markerEpoch == currentEpoch and no transaction is ongoing, this indicates + // a retry of a marker that was already successfully written. Common scenarios: + // 1. Coordinator recovery: reloading PREPARE_COMMIT/ABORT from transaction log // 2. Network retry: marker was written but response was lost due to disconnection - // In both cases, the transaction has already ended (currentTxnFirstOffset is empty), - // so we can safely treat this as idempotent success. + // In both cases, the transaction has already ended (currentTxnFirstOffset is empty). + // We suppress the InvalidProducerEpochException and allow the duplicate marker to + // be written to the log. if (transactionVersion >= 2 && producerEpoch == current && updatedEntry.currentTxnFirstOffset().isEmpty()) { - log.debug("Received duplicate transaction marker for producer {} with epoch {} " + - "but transaction is no longer ongoing, treating as idempotent success", + log.info("Idempotent transaction marker retry detected for producer {} epoch {}. " + + "Transaction already completed, allowing duplicate marker write.", producerId, producerEpoch); - throw new IdempotentTransactionMarkerException(); + return; } String comparison = (transactionVersion >= 2) ? "<=" : "<"; String message = "Epoch of producer " + producerId + " at offset " + offset + " in " + topicPartition + diff --git a/storage/src/test/java/org/apache/kafka/storage/internals/log/ProducerStateManagerTest.java b/storage/src/test/java/org/apache/kafka/storage/internals/log/ProducerStateManagerTest.java index 9a57404196b4c..4a8554252593f 100644 --- a/storage/src/test/java/org/apache/kafka/storage/internals/log/ProducerStateManagerTest.java +++ b/storage/src/test/java/org/apache/kafka/storage/internals/log/ProducerStateManagerTest.java @@ -1106,11 +1106,7 @@ public void testIdempotentTransactionMarkerExceptionThrownTV2() { ProducerAppendInfo appendInfo = stateManager.prepareUpdate(producerId, AppendOrigin.COORDINATOR); EndTransactionMarker endTxnMarker = new EndTransactionMarker(ControlRecordType.COMMIT, 0); - // This should throw IdempotentTransactionMarkerException because: - // 1. markerEpoch (1) == currentEpoch (1) - // 2. currentTxnFirstOffset is empty (transaction completed) - assertThrows( - IdempotentTransactionMarkerException.class, + assertDoesNotThrow( () -> appendInfo.appendEndTxnMarker(endTxnMarker, markerEpoch, 101, time.milliseconds(), transactionVersion) ); } From 5807a5e892787de0af372717e5247387a8753f43 Mon Sep 17 00:00:00 2001 From: m1a2st Date: Wed, 24 Dec 2025 08:59:09 +0800 Subject: [PATCH 6/7] remove unused exception --- .../IdempotentTransactionMarkerException.java | 44 ------------------- 1 file changed, 44 deletions(-) delete mode 100644 storage/src/main/java/org/apache/kafka/storage/internals/log/IdempotentTransactionMarkerException.java diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/IdempotentTransactionMarkerException.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/IdempotentTransactionMarkerException.java deleted file mode 100644 index 0dad017effbf6..0000000000000 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/IdempotentTransactionMarkerException.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.storage.internals.log; - -import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.protocol.Errors; - -/** - * Indicates that a transaction marker was received as part of an idempotent retry - * and should be treated as a successful no-op rather than an error. - * - *

This exception is thrown when: - *

    - *
  • A TV2 transaction marker arrives with the same epoch as current
  • - *
  • No transaction is currently ongoing (currentTxnFirstOffset is empty)
  • - *
- * - *

Common scenarios include coordinator recovery and network-induced retries. - * Callers should catch this exception and treat it as a successful operation. - */ -public class IdempotentTransactionMarkerException extends KafkaException { - - public IdempotentTransactionMarkerException() { - super(); - } - - public static boolean isInstanceOf(Throwable t) { - return Errors.maybeUnwrapException(t) instanceof IdempotentTransactionMarkerException; - } -} From a14bd3de011f7f18ef7a55f214b3d2da48a2128e Mon Sep 17 00:00:00 2001 From: m1a2st Date: Thu, 25 Dec 2025 09:43:28 +0800 Subject: [PATCH 7/7] addressed by comments --- .../internals/log/ProducerStateManagerTest.java | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/storage/src/test/java/org/apache/kafka/storage/internals/log/ProducerStateManagerTest.java b/storage/src/test/java/org/apache/kafka/storage/internals/log/ProducerStateManagerTest.java index 4a8554252593f..b504fb76636d7 100644 --- a/storage/src/test/java/org/apache/kafka/storage/internals/log/ProducerStateManagerTest.java +++ b/storage/src/test/java/org/apache/kafka/storage/internals/log/ProducerStateManagerTest.java @@ -1091,11 +1091,9 @@ public void testVerificationStateEntryExpiration() { } @Test - public void testIdempotentTransactionMarkerExceptionThrownTV2() { + public void testIdempotentTransactionMarkerRetryTV2() { short transactionVersion = 2; appendClientEntry(stateManager, producerId, epoch, defaultSequence, 99, true); - assertEquals(OptionalLong.of(99L), stateManager.firstUndecidedOffset()); - short markerEpoch = (short) (epoch + 1); appendEndTxnMarker(stateManager, producerId, markerEpoch, ControlRecordType.COMMIT, 100, transactionVersion); @@ -1103,12 +1101,13 @@ public void testIdempotentTransactionMarkerExceptionThrownTV2() { assertEquals(markerEpoch, entry.producerEpoch()); assertEquals(OptionalLong.empty(), entry.currentTxnFirstOffset()); - ProducerAppendInfo appendInfo = stateManager.prepareUpdate(producerId, AppendOrigin.COORDINATOR); - EndTransactionMarker endTxnMarker = new EndTransactionMarker(ControlRecordType.COMMIT, 0); - - assertDoesNotThrow( - () -> appendInfo.appendEndTxnMarker(endTxnMarker, markerEpoch, 101, time.milliseconds(), transactionVersion) + assertDoesNotThrow(() -> + appendEndTxnMarker(stateManager, producerId, markerEpoch, ControlRecordType.COMMIT, 101, transactionVersion) ); + + ProducerStateEntry entryAfterRetry = getLastEntryOrElseThrownByProducerId(stateManager, producerId); + assertEquals(markerEpoch, entryAfterRetry.producerEpoch()); + assertEquals(OptionalLong.empty(), entryAfterRetry.currentTxnFirstOffset()); } @Test