diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 0229c43cb8b3c..96aa03fb04d72 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -665,6 +665,7 @@ public void initTransactions() { sender.wakeup(); result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS); producerMetrics.recordInit(time.nanoseconds() - now); + transactionManager.maybeUpdateTransactionV2Enabled(true); } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index 13935d483b5be..ab99b3d3e4666 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -211,7 +211,7 @@ private boolean isTransitionValid(State source, State target) { case UNINITIALIZED: return source == READY || source == ABORTABLE_ERROR; case INITIALIZING: - return source == UNINITIALIZED || source == ABORTING_TRANSACTION; + return source == UNINITIALIZED || source == COMMITTING_TRANSACTION || source == ABORTING_TRANSACTION; case READY: return source == INITIALIZING || source == COMMITTING_TRANSACTION || source == ABORTING_TRANSACTION; case IN_TRANSACTION: @@ -312,7 +312,6 @@ public synchronized void beginTransaction() { throwIfPendingState("beginTransaction"); maybeFailWithError(); transitionTo(State.IN_TRANSACTION); - maybeUpdateTransactionV2Enabled(); } public synchronized TransactionalRequestResult beginCommit() { @@ -348,10 +347,17 @@ private TransactionalRequestResult beginCompletingTransaction(TransactionResult isTransactionV2Enabled ); + // Maybe update the transaction version here before we enqueue the EndTxn request so there are no races with + // completion of the EndTxn request. Since this method may update clientSideEpochBumpRequired, we want to update + // before the check below, but we also want to call it after the EndTxnRequest.Builder so we complete the transaction + // with the same version as it started. + maybeUpdateTransactionV2Enabled(false); + EndTxnHandler handler = new EndTxnHandler(builder); enqueueRequest(handler); // If an epoch bump is required for recovery, initialize the transaction after completing the EndTxn request. + // If we are upgrading to TV2 transactions on the next transaction, also bump the epoch. if (clientSideEpochBumpRequired) { return initializeTransactions(this.producerIdAndEpoch); } @@ -437,15 +443,25 @@ public boolean isTransactional() { return transactionalId != null; } - // Check all the finalized features from apiVersions to whether the transaction V2 is enabled. - public synchronized void maybeUpdateTransactionV2Enabled() { + /** + * Check all the finalized features from apiVersions to verify whether the transaction V2 is enabled. + * Sets clientSideEpochBumpRequired if upgrading to V2 since we need to bump the epoch. + * This is because V2 no longer adds partitions explicitly and there are some edge cases on upgrade + * that can be avoided by fencing the old V1 transaction epoch. For example, we won't consider + * partitions from the previous transaction as already added to the new V2 transaction if the epoch is fenced. + */ + + public synchronized void maybeUpdateTransactionV2Enabled(boolean onInitiatialization) { if (latestFinalizedFeaturesEpoch >= apiVersions.getMaxFinalizedFeaturesEpoch()) { return; } ApiVersions.FinalizedFeaturesInfo info = apiVersions.getFinalizedFeaturesInfo(); latestFinalizedFeaturesEpoch = info.finalizedFeaturesEpoch; Short transactionVersion = info.finalizedFeatures.get("transaction.version"); + boolean wasTransactionV2Enabled = isTransactionV2Enabled; isTransactionV2Enabled = transactionVersion != null && transactionVersion >= 2; + if (!onInitiatialization && !wasTransactionV2Enabled && isTransactionV2Enabled) + clientSideEpochBumpRequired = true; } public boolean isTransactionV2Enabled() { @@ -1652,6 +1668,8 @@ public void handleResponse(AbstractResponse response) { // When Transaction Version 2 is enabled, the end txn request 5+ is used, // it mandates bumping the epoch after every transaction. // If the epoch overflows, a new producerId is returned with epoch set to 0. + // Note, we still may see EndTxn TV1 (< 5) responses when the producer has upgraded to TV2 due to the upgrade + // occurring at the end of beginCompletingTransaction. The next transaction started should be TV2. if (endTxnResponse.data().producerId() != -1) { ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch( endTxnResponse.data().producerId(), diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index a72ce65a6e964..8190579634c60 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -947,18 +947,20 @@ public void testTransactionManagerEnablesV2() { TransactionalRequestResult retryResult = transactionManager.beginCommit(); assertTrue(transactionManager.hasOngoingTransaction()); - assertFalse(transactionManager.isTransactionV2Enabled()); + assertTrue(transactionManager.isTransactionV2Enabled()); prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, producerId, epoch); + prepareInitPidResponse(Errors.NONE, false, producerId, (short) (epoch + 1)); runUntil(() -> !transactionManager.hasOngoingTransaction()); runUntil(retryResult::isCompleted); retryResult.await(); runUntil(retryResult::isAcked); assertFalse(transactionManager.hasOngoingTransaction()); - // After restart the transaction, the V2 is enabled. + // After restart the transaction, the V2 is still enabled and epoch is bumped. transactionManager.beginTransaction(); assertTrue(transactionManager.isTransactionV2Enabled()); + assertEquals(epoch + 1, transactionManager.producerIdAndEpoch().epoch); } @Test @@ -4319,6 +4321,7 @@ private void doInitTransactions(long producerId, short epoch) { prepareInitPidResponse(Errors.NONE, false, producerId, epoch); runUntil(transactionManager::hasProducerId); + transactionManager.maybeUpdateTransactionV2Enabled(true); result.await(); assertTrue(result.isSuccessful());