Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,7 @@ public void initTransactions() {
sender.wakeup();
result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS);
producerMetrics.recordInit(time.nanoseconds() - now);
transactionManager.maybeUpdateTransactionV2Enabled(true);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ private boolean isTransitionValid(State source, State target) {
case UNINITIALIZED:
return source == READY || source == ABORTABLE_ERROR;
case INITIALIZING:
return source == UNINITIALIZED || source == ABORTING_TRANSACTION;
return source == UNINITIALIZED || source == COMMITTING_TRANSACTION || source == ABORTING_TRANSACTION;
case READY:
return source == INITIALIZING || source == COMMITTING_TRANSACTION || source == ABORTING_TRANSACTION;
case IN_TRANSACTION:
Expand Down Expand Up @@ -312,7 +312,6 @@ public synchronized void beginTransaction() {
throwIfPendingState("beginTransaction");
maybeFailWithError();
transitionTo(State.IN_TRANSACTION);
maybeUpdateTransactionV2Enabled();
}

public synchronized TransactionalRequestResult beginCommit() {
Expand Down Expand Up @@ -348,10 +347,17 @@ private TransactionalRequestResult beginCompletingTransaction(TransactionResult
isTransactionV2Enabled
);

// Maybe update the transaction version here before we enqueue the EndTxn request so there are no races with
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably clarify that this function may set the clientSideEpochBumpRequired on upgrade and that's the reason we do it here, before we call logic that checks clientSideEpochBumpRequired. But not before we build the EndTxnRequest, because we need to complete the transaction with the version it started.

// completion of the EndTxn request. Since this method may update clientSideEpochBumpRequired, we want to update
// before the check below, but we also want to call it after the EndTxnRequest.Builder so we complete the transaction
// with the same version as it started.
maybeUpdateTransactionV2Enabled(false);

EndTxnHandler handler = new EndTxnHandler(builder);
enqueueRequest(handler);

// If an epoch bump is required for recovery, initialize the transaction after completing the EndTxn request.
// If we are upgrading to TV2 transactions on the next transaction, also bump the epoch.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The order of txn requests is enforced by the queue -- endTxn will happen before InitPid because we queue them here. The race condition happens is because at this point the endTxn is executing concurrently, it may or may not finish by the time this line runs by this thread, and therefore it may or may not observe the effects of this line. You can add a sleep before this line and observe the race condition where endTxn finishes before maybeUpdateTransactionV2Enabled is called.

A proper way to do it, is to call this function before enqueuing the request (i.e. move to line 353). Plus add a comment why it's there an not here.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To understand the impact of the race condition:

First, we want to complete the transaction. however, if the end txn completes first then we may additionally start a new transaction here. is this correct?

Copy link
Copy Markdown
Member Author

@jolshan jolshan Dec 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The race condition involved the previous "state != READY" check that is now removed from the PR. But the idea was that in the race, the EndTxn request would queue and complete before we called the method to maybe upgrade to TV2. In the case that the transaction completes first, we would transition to READY (rather than committing/aborting transaction) and we wouldn't trigger the "upgrade to TV2" (now just clientSideEpochBumpRequired) condition.

Changing the method to not check the state and check on the method parameter eliminated that part of issue, but we also still have the general transition to READY we want to be careful about since we can't easily bump epoch if we are in READY state.

Right now, since we piggyback on clientSideEpochBumpRequired we will always transition to INITIALIZING after completing the transaction since we set that value BEFORE enqueueing the EndTxn request. See

.

TLDR; the issue was not starting a new transaction but ensuring we correctly identify the TV2 upgrade case and bumping the epoch (which could be missed in a race due to idiosyncrasies in the state machine).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, thanks for the correction, seems like i was thinking about the opposite ordering of events. makes sense

if (clientSideEpochBumpRequired) {
return initializeTransactions(this.producerIdAndEpoch);
}
Expand Down Expand Up @@ -437,15 +443,25 @@ public boolean isTransactional() {
return transactionalId != null;
}

// Check all the finalized features from apiVersions to whether the transaction V2 is enabled.
public synchronized void maybeUpdateTransactionV2Enabled() {
/**
* Check all the finalized features from apiVersions to verify whether the transaction V2 is enabled.
* Sets clientSideEpochBumpRequired if upgrading to V2 since we need to bump the epoch.
* This is because V2 no longer adds partitions explicitly and there are some edge cases on upgrade
* that can be avoided by fencing the old V1 transaction epoch. For example, we won't consider
* partitions from the previous transaction as already added to the new V2 transaction if the epoch is fenced.
*/

public synchronized void maybeUpdateTransactionV2Enabled(boolean onInitiatialization) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to understand the main ideas:

  • we want to handle cases when v1 producer is completing a transaction + while upgrading to v2
  • for the above case, we need to bump the epoch. otherwise, the next end txn will be fenced (v2 same epoch)

please correct me if i'm missing anything

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is mostly right, but want to clarify a few subtle things.
We want to handle cases where v1 is completing a transaction while upgrading to v2.

In this scenario, since the V1 transaction has the same epoch as the incoming v2 transaction, there are edge cases where we could miss adding partitions to the new V2 transaction while the v1 transaction was still completing. In the case where the partitions were exactly the same (or a subset) we fence the producer on EndTxn.

In another scenario, where we don't send an EndTxn request (say the producer dies) we would have transactional records on partitions that the coordinator doesn't know about (hanging transaction 😱 )

Both scenarios can be avoided by bumping the epoch after v1 commits. Specifically, the producer will get its new bumped epoch before it can even begin the v2 transaction. With the new epoch, it can differentiate between the partitions it added in the v1 transaction and the ones it will need to add for the v2 transaction.

if (latestFinalizedFeaturesEpoch >= apiVersions.getMaxFinalizedFeaturesEpoch()) {
return;
}
ApiVersions.FinalizedFeaturesInfo info = apiVersions.getFinalizedFeaturesInfo();
latestFinalizedFeaturesEpoch = info.finalizedFeaturesEpoch;
Short transactionVersion = info.finalizedFeatures.get("transaction.version");
boolean wasTransactionV2Enabled = isTransactionV2Enabled;
isTransactionV2Enabled = transactionVersion != null && transactionVersion >= 2;
if (!onInitiatialization && !wasTransactionV2Enabled && isTransactionV2Enabled)
clientSideEpochBumpRequired = true;
}

public boolean isTransactionV2Enabled() {
Expand Down Expand Up @@ -1652,6 +1668,8 @@ public void handleResponse(AbstractResponse response) {
// When Transaction Version 2 is enabled, the end txn request 5+ is used,
// it mandates bumping the epoch after every transaction.
// If the epoch overflows, a new producerId is returned with epoch set to 0.
// Note, we still may see EndTxn TV1 (< 5) responses when the producer has upgraded to TV2 due to the upgrade
// occurring at the end of beginCompletingTransaction. The next transaction started should be TV2.
if (endTxnResponse.data().producerId() != -1) {
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(
endTxnResponse.data().producerId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -947,18 +947,20 @@ public void testTransactionManagerEnablesV2() {

TransactionalRequestResult retryResult = transactionManager.beginCommit();
assertTrue(transactionManager.hasOngoingTransaction());
assertFalse(transactionManager.isTransactionV2Enabled());
assertTrue(transactionManager.isTransactionV2Enabled());

prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, producerId, epoch);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this ensures that the epoch of when the begin complete transaction started was used right?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup exactly. The v1 transaction will have its original epoch (not bumped) and the next InitProducerId response will have the new epoch as expected.

prepareInitPidResponse(Errors.NONE, false, producerId, (short) (epoch + 1));
runUntil(() -> !transactionManager.hasOngoingTransaction());
runUntil(retryResult::isCompleted);
retryResult.await();
runUntil(retryResult::isAcked);
assertFalse(transactionManager.hasOngoingTransaction());

// After restart the transaction, the V2 is enabled.
// After restart the transaction, the V2 is still enabled and epoch is bumped.
transactionManager.beginTransaction();
assertTrue(transactionManager.isTransactionV2Enabled());
assertEquals(epoch + 1, transactionManager.producerIdAndEpoch().epoch);
}

@Test
Expand Down Expand Up @@ -4319,6 +4321,7 @@ private void doInitTransactions(long producerId, short epoch) {

prepareInitPidResponse(Errors.NONE, false, producerId, epoch);
runUntil(transactionManager::hasProducerId);
transactionManager.maybeUpdateTransactionV2Enabled(true);

result.await();
assertTrue(result.isSuccessful());
Expand Down