From a7520efe6ad5a4025f7c50e697e21a62f2b8b4b4 Mon Sep 17 00:00:00 2001 From: Justine Date: Fri, 13 Dec 2024 12:05:02 -0800 Subject: [PATCH 1/8] Move tv2 checks + add init producer ID if transaction just completed. --- .../kafka/clients/producer/KafkaProducer.java | 1 + .../producer/internals/TransactionManager.java | 16 +++++++++++----- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 0229c43cb8b3c..e42badde2bc05 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -665,6 +665,7 @@ public void initTransactions() { sender.wakeup(); result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS); producerMetrics.recordInit(time.nanoseconds() - now); + transactionManager.maybeUpdateTransactionV2Enabled(); } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index 13935d483b5be..17847ffb9b368 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -312,7 +312,6 @@ public synchronized void beginTransaction() { throwIfPendingState("beginTransaction"); maybeFailWithError(); transitionTo(State.IN_TRANSACTION); - maybeUpdateTransactionV2Enabled(); } public synchronized TransactionalRequestResult beginCommit() { @@ -352,7 +351,8 @@ private TransactionalRequestResult beginCompletingTransaction(TransactionResult enqueueRequest(handler); // If an epoch bump is required for recovery, initialize the transaction after completing the EndTxn request. - if (clientSideEpochBumpRequired) { + // If we are upgrading to TV2 transactions on the next transaction, also bump the epoch. + if (clientSideEpochBumpRequired || maybeUpdateTransactionV2Enabled()) { return initializeTransactions(this.producerIdAndEpoch); } @@ -437,15 +437,21 @@ public boolean isTransactional() { return transactionalId != null; } - // Check all the finalized features from apiVersions to whether the transaction V2 is enabled. - public synchronized void maybeUpdateTransactionV2Enabled() { + /** + * Check all the finalized features from apiVersions to whether the transaction V2 is enabled. + * @return true if updated isTransactionV2Enabled to true -- meaning we are just enabling the TV2 protocol + */ + + public synchronized boolean maybeUpdateTransactionV2Enabled() { if (latestFinalizedFeaturesEpoch >= apiVersions.getMaxFinalizedFeaturesEpoch()) { - return; + return false; } ApiVersions.FinalizedFeaturesInfo info = apiVersions.getFinalizedFeaturesInfo(); latestFinalizedFeaturesEpoch = info.finalizedFeaturesEpoch; Short transactionVersion = info.finalizedFeatures.get("transaction.version"); + boolean previousValue = isTransactionV2Enabled; isTransactionV2Enabled = transactionVersion != null && transactionVersion >= 2; + return !previousValue && isTransactionV2Enabled; } public boolean isTransactionV2Enabled() { From 3e740ece864ae19ff50c4df0e2a3cdcaaa3bf051 Mon Sep 17 00:00:00 2001 From: Justine Date: Fri, 13 Dec 2024 16:47:47 -0800 Subject: [PATCH 2/8] Fix tests, update state machine to account for upgrading --- .../internals/TransactionManager.java | 20 +++++++++++++------ .../internals/TransactionManagerTest.java | 7 +++++-- 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index 17847ffb9b368..0b8159245ac81 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -194,11 +194,13 @@ public class TransactionManager { private volatile boolean transactionStarted = false; private volatile boolean clientSideEpochBumpRequired = false; private volatile long latestFinalizedFeaturesEpoch = -1; + private volatile boolean isUpgradingToV2 = false; private volatile boolean isTransactionV2Enabled = false; private enum State { UNINITIALIZED, INITIALIZING, + UPGRADING, READY, IN_TRANSACTION, COMMITTING_TRANSACTION, @@ -212,8 +214,10 @@ private boolean isTransitionValid(State source, State target) { return source == READY || source == ABORTABLE_ERROR; case INITIALIZING: return source == UNINITIALIZED || source == ABORTING_TRANSACTION; + case UPGRADING: + return source == ABORTING_TRANSACTION || source == COMMITTING_TRANSACTION; case READY: - return source == INITIALIZING || source == COMMITTING_TRANSACTION || source == ABORTING_TRANSACTION; + return source == INITIALIZING || source == COMMITTING_TRANSACTION || source == ABORTING_TRANSACTION || source == UPGRADING; case IN_TRANSACTION: return source == READY; case COMMITTING_TRANSACTION: @@ -352,7 +356,8 @@ private TransactionalRequestResult beginCompletingTransaction(TransactionResult // If an epoch bump is required for recovery, initialize the transaction after completing the EndTxn request. // If we are upgrading to TV2 transactions on the next transaction, also bump the epoch. - if (clientSideEpochBumpRequired || maybeUpdateTransactionV2Enabled()) { + maybeUpdateTransactionV2Enabled(); + if (clientSideEpochBumpRequired || isUpgradingToV2) { return initializeTransactions(this.producerIdAndEpoch); } @@ -439,19 +444,19 @@ public boolean isTransactional() { /** * Check all the finalized features from apiVersions to whether the transaction V2 is enabled. - * @return true if updated isTransactionV2Enabled to true -- meaning we are just enabling the TV2 protocol + * Sets isUpgradingToV2 if the previous value was false and now it is true. */ - public synchronized boolean maybeUpdateTransactionV2Enabled() { + public synchronized void maybeUpdateTransactionV2Enabled() { if (latestFinalizedFeaturesEpoch >= apiVersions.getMaxFinalizedFeaturesEpoch()) { - return false; + return; } ApiVersions.FinalizedFeaturesInfo info = apiVersions.getFinalizedFeaturesInfo(); latestFinalizedFeaturesEpoch = info.finalizedFeaturesEpoch; Short transactionVersion = info.finalizedFeatures.get("transaction.version"); boolean previousValue = isTransactionV2Enabled; isTransactionV2Enabled = transactionVersion != null && transactionVersion >= 2; - return !previousValue && isTransactionV2Enabled; + isUpgradingToV2 = currentState != State.READY && !previousValue && isTransactionV2Enabled; } public boolean isTransactionV2Enabled() { @@ -1249,11 +1254,14 @@ boolean canHandleAbortableError() { private void completeTransaction() { if (clientSideEpochBumpRequired) { transitionTo(State.INITIALIZING); + } else if (isUpgradingToV2) { + transitionTo(State.UPGRADING); } else { transitionTo(State.READY); } lastError = null; clientSideEpochBumpRequired = false; + isUpgradingToV2 = false; transactionStarted = false; newPartitionsInTransaction.clear(); pendingPartitionsInTransaction.clear(); diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index a72ce65a6e964..2f03a7c0fb244 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -947,18 +947,20 @@ public void testTransactionManagerEnablesV2() { TransactionalRequestResult retryResult = transactionManager.beginCommit(); assertTrue(transactionManager.hasOngoingTransaction()); - assertFalse(transactionManager.isTransactionV2Enabled()); + assertTrue(transactionManager.isTransactionV2Enabled()); prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, producerId, epoch); + prepareInitPidResponse(Errors.NONE, false, producerId, (short) (epoch + 1)); runUntil(() -> !transactionManager.hasOngoingTransaction()); runUntil(retryResult::isCompleted); retryResult.await(); runUntil(retryResult::isAcked); assertFalse(transactionManager.hasOngoingTransaction()); - // After restart the transaction, the V2 is enabled. + // After restart the transaction, the V2 is still enabled and epoch is bumped. transactionManager.beginTransaction(); assertTrue(transactionManager.isTransactionV2Enabled()); + assertEquals(epoch + 1, transactionManager.producerIdAndEpoch().epoch); } @Test @@ -4319,6 +4321,7 @@ private void doInitTransactions(long producerId, short epoch) { prepareInitPidResponse(Errors.NONE, false, producerId, epoch); runUntil(transactionManager::hasProducerId); + transactionManager.maybeUpdateTransactionV2Enabled(); result.await(); assertTrue(result.isSuccessful()); From 54ad8862894d39acb8fabcea033392124c14d38a Mon Sep 17 00:00:00 2001 From: Justine Date: Tue, 17 Dec 2024 09:02:13 -0800 Subject: [PATCH 3/8] Address some comments --- .../org/apache/kafka/clients/producer/KafkaProducer.java | 2 +- .../clients/producer/internals/TransactionManager.java | 8 ++++---- .../producer/internals/TransactionManagerTest.java | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index e42badde2bc05..96aa03fb04d72 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -665,7 +665,7 @@ public void initTransactions() { sender.wakeup(); result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS); producerMetrics.recordInit(time.nanoseconds() - now); - transactionManager.maybeUpdateTransactionV2Enabled(); + transactionManager.maybeUpdateTransactionV2Enabled(true); } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index 0b8159245ac81..eb25ee4223f54 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -356,7 +356,7 @@ private TransactionalRequestResult beginCompletingTransaction(TransactionResult // If an epoch bump is required for recovery, initialize the transaction after completing the EndTxn request. // If we are upgrading to TV2 transactions on the next transaction, also bump the epoch. - maybeUpdateTransactionV2Enabled(); + maybeUpdateTransactionV2Enabled(false); if (clientSideEpochBumpRequired || isUpgradingToV2) { return initializeTransactions(this.producerIdAndEpoch); } @@ -447,16 +447,16 @@ public boolean isTransactional() { * Sets isUpgradingToV2 if the previous value was false and now it is true. */ - public synchronized void maybeUpdateTransactionV2Enabled() { + public synchronized void maybeUpdateTransactionV2Enabled(boolean onInitiatialization) { if (latestFinalizedFeaturesEpoch >= apiVersions.getMaxFinalizedFeaturesEpoch()) { return; } ApiVersions.FinalizedFeaturesInfo info = apiVersions.getFinalizedFeaturesInfo(); latestFinalizedFeaturesEpoch = info.finalizedFeaturesEpoch; Short transactionVersion = info.finalizedFeatures.get("transaction.version"); - boolean previousValue = isTransactionV2Enabled; + boolean wasTransactionV2Enabled = isTransactionV2Enabled; isTransactionV2Enabled = transactionVersion != null && transactionVersion >= 2; - isUpgradingToV2 = currentState != State.READY && !previousValue && isTransactionV2Enabled; + isUpgradingToV2 = !onInitiatialization && !wasTransactionV2Enabled && isTransactionV2Enabled; } public boolean isTransactionV2Enabled() { diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index 2f03a7c0fb244..8190579634c60 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -4321,7 +4321,7 @@ private void doInitTransactions(long producerId, short epoch) { prepareInitPidResponse(Errors.NONE, false, producerId, epoch); runUntil(transactionManager::hasProducerId); - transactionManager.maybeUpdateTransactionV2Enabled(); + transactionManager.maybeUpdateTransactionV2Enabled(true); result.await(); assertTrue(result.isSuccessful()); From b70fdf2a0199d4ce7eeb4186d1049fd95dd37d15 Mon Sep 17 00:00:00 2001 From: Justine Date: Tue, 17 Dec 2024 10:40:26 -0800 Subject: [PATCH 4/8] Add comment --- .../kafka/clients/producer/internals/TransactionManager.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index eb25ee4223f54..49cebf8b985b6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -1666,6 +1666,8 @@ public void handleResponse(AbstractResponse response) { // When Transaction Version 2 is enabled, the end txn request 5+ is used, // it mandates bumping the epoch after every transaction. // If the epoch overflows, a new producerId is returned with epoch set to 0. + // Note, we still may see EndTxn TV1 (< 5) responses when the producer has upgraded to TV2 due to the upgrade + // occurring at the end of beginCompletingTransaction. The next transaction started should be TV2. if (endTxnResponse.data().producerId() != -1) { ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch( endTxnResponse.data().producerId(), From f27873da31819fce34c8254625ca8c879be626a0 Mon Sep 17 00:00:00 2001 From: Justine Date: Tue, 17 Dec 2024 12:09:38 -0800 Subject: [PATCH 5/8] Remove extra state, move check to avoid races. --- .../internals/TransactionManager.java | 20 ++++++++----------- 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index 49cebf8b985b6..4ada4615d392d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -194,13 +194,11 @@ public class TransactionManager { private volatile boolean transactionStarted = false; private volatile boolean clientSideEpochBumpRequired = false; private volatile long latestFinalizedFeaturesEpoch = -1; - private volatile boolean isUpgradingToV2 = false; private volatile boolean isTransactionV2Enabled = false; private enum State { UNINITIALIZED, INITIALIZING, - UPGRADING, READY, IN_TRANSACTION, COMMITTING_TRANSACTION, @@ -213,11 +211,9 @@ private boolean isTransitionValid(State source, State target) { case UNINITIALIZED: return source == READY || source == ABORTABLE_ERROR; case INITIALIZING: - return source == UNINITIALIZED || source == ABORTING_TRANSACTION; - case UPGRADING: - return source == ABORTING_TRANSACTION || source == COMMITTING_TRANSACTION; + return source == UNINITIALIZED || source == ABORTING_TRANSACTION || source == COMMITTING_TRANSACTION; case READY: - return source == INITIALIZING || source == COMMITTING_TRANSACTION || source == ABORTING_TRANSACTION || source == UPGRADING; + return source == INITIALIZING || source == COMMITTING_TRANSACTION || source == ABORTING_TRANSACTION; case IN_TRANSACTION: return source == READY; case COMMITTING_TRANSACTION: @@ -351,13 +347,16 @@ private TransactionalRequestResult beginCompletingTransaction(TransactionResult isTransactionV2Enabled ); + // Maybe update the transaction version here before we enqueue the EndTxn request so there are no races with + // completion of the EndTxn request. + maybeUpdateTransactionV2Enabled(false); + EndTxnHandler handler = new EndTxnHandler(builder); enqueueRequest(handler); // If an epoch bump is required for recovery, initialize the transaction after completing the EndTxn request. // If we are upgrading to TV2 transactions on the next transaction, also bump the epoch. - maybeUpdateTransactionV2Enabled(false); - if (clientSideEpochBumpRequired || isUpgradingToV2) { + if (clientSideEpochBumpRequired) { return initializeTransactions(this.producerIdAndEpoch); } @@ -456,7 +455,7 @@ public synchronized void maybeUpdateTransactionV2Enabled(boolean onInitiatializa Short transactionVersion = info.finalizedFeatures.get("transaction.version"); boolean wasTransactionV2Enabled = isTransactionV2Enabled; isTransactionV2Enabled = transactionVersion != null && transactionVersion >= 2; - isUpgradingToV2 = !onInitiatialization && !wasTransactionV2Enabled && isTransactionV2Enabled; + clientSideEpochBumpRequired = !onInitiatialization && !wasTransactionV2Enabled && isTransactionV2Enabled; } public boolean isTransactionV2Enabled() { @@ -1254,14 +1253,11 @@ boolean canHandleAbortableError() { private void completeTransaction() { if (clientSideEpochBumpRequired) { transitionTo(State.INITIALIZING); - } else if (isUpgradingToV2) { - transitionTo(State.UPGRADING); } else { transitionTo(State.READY); } lastError = null; clientSideEpochBumpRequired = false; - isUpgradingToV2 = false; transactionStarted = false; newPartitionsInTransaction.clear(); pendingPartitionsInTransaction.clear(); From b85f758b6f1eb006d7dc99c4d5e202031664839d Mon Sep 17 00:00:00 2001 From: Justine Date: Tue, 17 Dec 2024 14:14:09 -0800 Subject: [PATCH 6/8] Adding comments --- .../producer/internals/TransactionManager.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index 4ada4615d392d..634098b495f44 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -211,7 +211,7 @@ private boolean isTransitionValid(State source, State target) { case UNINITIALIZED: return source == READY || source == ABORTABLE_ERROR; case INITIALIZING: - return source == UNINITIALIZED || source == ABORTING_TRANSACTION || source == COMMITTING_TRANSACTION; + return source == UNINITIALIZED || source == COMMITTING_TRANSACTION || source == ABORTING_TRANSACTION; case READY: return source == INITIALIZING || source == COMMITTING_TRANSACTION || source == ABORTING_TRANSACTION; case IN_TRANSACTION: @@ -348,7 +348,9 @@ private TransactionalRequestResult beginCompletingTransaction(TransactionResult ); // Maybe update the transaction version here before we enqueue the EndTxn request so there are no races with - // completion of the EndTxn request. + // completion of the EndTxn request. Since this method may update clientSideEpochBumpRequired, we want to update + // before the check below, but we also want to call it after the EndTxnRequest.Builder so we complete the transaction + // with the same version as it started. maybeUpdateTransactionV2Enabled(false); EndTxnHandler handler = new EndTxnHandler(builder); @@ -443,7 +445,10 @@ public boolean isTransactional() { /** * Check all the finalized features from apiVersions to whether the transaction V2 is enabled. - * Sets isUpgradingToV2 if the previous value was false and now it is true. + * Sets clientSideEpochBumpRequired if upgrading to V2 since we need to bump the epoch. + * This is because V2 no longer adds partitions explicitly and there are some edge cases on upgrade + * that can be avoided by fencing the old V1 transaction epoch. For example, we won't consider + * partitions from the previous transaction as already added to the new V2 transaction if the epoch is fenced. */ public synchronized void maybeUpdateTransactionV2Enabled(boolean onInitiatialization) { From cb99d6ca8a3e2deae64e398578aef612096c6ec7 Mon Sep 17 00:00:00 2001 From: Justine Date: Tue, 17 Dec 2024 18:16:13 -0800 Subject: [PATCH 7/8] Fix bug -- we don't want to remove true value for clientEpochBump --- .../kafka/clients/producer/internals/TransactionManager.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index 634098b495f44..6efdae25955a1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -460,7 +460,8 @@ public synchronized void maybeUpdateTransactionV2Enabled(boolean onInitiatializa Short transactionVersion = info.finalizedFeatures.get("transaction.version"); boolean wasTransactionV2Enabled = isTransactionV2Enabled; isTransactionV2Enabled = transactionVersion != null && transactionVersion >= 2; - clientSideEpochBumpRequired = !onInitiatialization && !wasTransactionV2Enabled && isTransactionV2Enabled; + if (!onInitiatialization && !wasTransactionV2Enabled && isTransactionV2Enabled) + clientSideEpochBumpRequired = true; } public boolean isTransactionV2Enabled() { From cb40d57cee179d873da7a54eb391909dd324016b Mon Sep 17 00:00:00 2001 From: Justine Date: Wed, 18 Dec 2024 13:48:40 -0800 Subject: [PATCH 8/8] Small nit --- .../kafka/clients/producer/internals/TransactionManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index 6efdae25955a1..ab99b3d3e4666 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -444,7 +444,7 @@ public boolean isTransactional() { } /** - * Check all the finalized features from apiVersions to whether the transaction V2 is enabled. + * Check all the finalized features from apiVersions to verify whether the transaction V2 is enabled. * Sets clientSideEpochBumpRequired if upgrading to V2 since we need to bump the epoch. * This is because V2 no longer adds partitions explicitly and there are some edge cases on upgrade * that can be avoided by fencing the old V1 transaction epoch. For example, we won't consider