Skip to content

KAFKA-18227: Ensure v2 partitions are not added to last transaction during upgrade#18176

Merged
jolshan merged 8 commits intoapache:trunkfrom
jolshan:kafka-18227
Dec 19, 2024
Merged

KAFKA-18227: Ensure v2 partitions are not added to last transaction during upgrade#18176
jolshan merged 8 commits intoapache:trunkfrom
jolshan:kafka-18227

Conversation

@jolshan
Copy link
Copy Markdown
Member

@jolshan jolshan commented Dec 13, 2024

We want to bump the epoch if we are upgrading to TV2. Given that we already have code in place for this, I thought we could piggyback on the completing transaction epoch bump logic. For just initializing producers, I moved the check to the end of InitTransaction. Note, we have do do this check after we initialize the producer ID to ensure we have updated ApiVersions correctly.

Will add tests to recreate some of the scenarios that caused us to find this bug.

@CalvinLiu7947
Copy link
Copy Markdown
Contributor

Thanks for the change.
It may cause a delay if TV2 is enabled after the previous txn commit. The immediate next txn will still use TV1.
I wonder if we can modify beginTransaction to init producer id instead.

@jolshan
Copy link
Copy Markdown
Member Author

jolshan commented Dec 13, 2024

It may cause a delay if TV2 is enabled after the previous txn commit. The immediate next txn will still use TV1.
I wonder if we can modify beginTransaction to init producer id instead.

I was trying to avoid sending an api in beginTransaction since it doesn't return a result.
But just for my understanding, how would the next transaction use TV1? We wait until the result of beginCompletingTransaction before we return. We also use this approach for the epoch bump and that needs to ensure the producer gets the new epoch before starting the next transaction right?

@jolshan
Copy link
Copy Markdown
Member Author

jolshan commented Dec 13, 2024

I will take a look at the TransactionManagerTest failures. 👍

@CalvinLiu7947
Copy link
Copy Markdown
Contributor

But just for my understanding, how would the next transaction use TV1? We wait until the result of beginCompletingTransaction before we return.

If the TV2 is ready to use during the current txn, before the endTxn request is sent, then the next txn uses TV2. If the TV2 is ready after the current txn's endTxn and before the next Txn, then the next txn is TV1.
Yeah, I agree current approach is simpler than modifying the beginTransaction. I think it is ok as the compatibility is corrected with the current approach.

@jolshan
Copy link
Copy Markdown
Member Author

jolshan commented Dec 16, 2024

return source == READY || source == ABORTABLE_ERROR;
case INITIALIZING:
return source == UNINITIALIZED || source == ABORTING_TRANSACTION;
case UPGRADING:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need a separate state? It looks like if we just updated the previous line to allow to transition from COMMITTING_TRANSACTION to INITIALIZING we could just use the clientSideEpochBumpRequired to drive the bump logic.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could do that but I didn't know if it could be unsafe if this transition somehow happened in a non-V2 upgrade case. I did consider this approach.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be safe to initialize after commit -- we'd just do an extra epoch bump (and it's in a case if we have a bug that's unknown to us). The thing is that the v1 code path will become mostly dead code in 6 months or so (new client + old broker is a rare combination, much less frequent than old client + new broker) and the value of making it more precise will rapidly diminish, but the complexity will stay.

// If an epoch bump is required for recovery, initialize the transaction after completing the EndTxn request.
if (clientSideEpochBumpRequired) {
// If we are upgrading to TV2 transactions on the next transaction, also bump the epoch.
maybeUpdateTransactionV2Enabled();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While unlikely, the transaction can actually complete (it's running concurrently with this code) and set state to READY, then the isUpgradingToV2 logic won't get triggered.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also transitioning the version here also means that we enable V2 before we fully finish transaction, could you add a comment to the EndTxnHandler.handleResponse that we can receive a V1 response even though we've transitioned into a V2 mode.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also should we remove it from the beginTransaction? If we somehow call it from there without going through this path, the upgrade flow won't trigger.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also transitioning the version here also means that we enable V2 before we fully finish transaction, could you add a comment to the EndTxnHandler.handleResponse that we can receive a V1 response even though we've transitioned into a V2 mode.

My understanding is that we do this after we add the EndTxn to the handler. My understanding is that the EndTxn request should always complete before the epoch bump is done (otherwise we would fence ourselves in the epoch bump case) so I thought this was safe, but I can think about this again.

Also should we remove it from the beginTransaction? If we somehow call it from there without going through this path, the upgrade flow won't trigger.

I have removed it from beginTransaction in this PR already.

return source == READY || source == ABORTABLE_ERROR;
case INITIALIZING:
return source == UNINITIALIZED || source == ABORTING_TRANSACTION;
case UPGRADING:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be safe to initialize after commit -- we'd just do an extra epoch bump (and it's in a case if we have a bug that's unknown to us). The thing is that the v1 code path will become mostly dead code in 6 months or so (new client + old broker is a rare combination, much less frequent than old client + new broker) and the value of making it more precise will rapidly diminish, but the complexity will stay.


// If an epoch bump is required for recovery, initialize the transaction after completing the EndTxn request.
if (clientSideEpochBumpRequired) {
// If we are upgrading to TV2 transactions on the next transaction, also bump the epoch.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The order of txn requests is enforced by the queue -- endTxn will happen before InitPid because we queue them here. The race condition happens is because at this point the endTxn is executing concurrently, it may or may not finish by the time this line runs by this thread, and therefore it may or may not observe the effects of this line. You can add a sleep before this line and observe the race condition where endTxn finishes before maybeUpdateTransactionV2Enabled is called.

A proper way to do it, is to call this function before enqueuing the request (i.e. move to line 353). Plus add a comment why it's there an not here.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To understand the impact of the race condition:

First, we want to complete the transaction. however, if the end txn completes first then we may additionally start a new transaction here. is this correct?

Copy link
Copy Markdown
Member Author

@jolshan jolshan Dec 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The race condition involved the previous "state != READY" check that is now removed from the PR. But the idea was that in the race, the EndTxn request would queue and complete before we called the method to maybe upgrade to TV2. In the case that the transaction completes first, we would transition to READY (rather than committing/aborting transaction) and we wouldn't trigger the "upgrade to TV2" (now just clientSideEpochBumpRequired) condition.

Changing the method to not check the state and check on the method parameter eliminated that part of issue, but we also still have the general transition to READY we want to be careful about since we can't easily bump epoch if we are in READY state.

Right now, since we piggyback on clientSideEpochBumpRequired we will always transition to INITIALIZING after completing the transaction since we set that value BEFORE enqueueing the EndTxn request. See

.

TLDR; the issue was not starting a new transaction but ensuring we correctly identify the TV2 upgrade case and bumping the epoch (which could be missed in a race due to idiosyncrasies in the state machine).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, thanks for the correction, seems like i was thinking about the opposite ordering of events. makes sense

isTransactionV2Enabled
);

// Maybe update the transaction version here before we enqueue the EndTxn request so there are no races with
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably clarify that this function may set the clientSideEpochBumpRequired on upgrade and that's the reason we do it here, before we call logic that checks clientSideEpochBumpRequired. But not before we build the EndTxnRequest, because we need to complete the transaction with the version it started.

return source == READY || source == ABORTABLE_ERROR;
case INITIALIZING:
return source == UNINITIALIZED || source == ABORTING_TRANSACTION;
return source == UNINITIALIZED || source == ABORTING_TRANSACTION || source == COMMITTING_TRANSACTION;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Super minor nit: the case below has COMMITTING_TRANSACTION check before the ABORTING_TRANSACTION check.

Copy link
Copy Markdown
Contributor

@artemlivshits artemlivshits left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@jolshan
Copy link
Copy Markdown
Member Author

jolshan commented Dec 17, 2024

@jolshan
Copy link
Copy Markdown
Member Author

jolshan commented Dec 18, 2024

There is some issue with testBumpTransactionalEpochWithTV2Disabled I will take a look


// If an epoch bump is required for recovery, initialize the transaction after completing the EndTxn request.
if (clientSideEpochBumpRequired) {
// If we are upgrading to TV2 transactions on the next transaction, also bump the epoch.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To understand the impact of the race condition:

First, we want to complete the transaction. however, if the end txn completes first then we may additionally start a new transaction here. is this correct?

assertFalse(transactionManager.isTransactionV2Enabled());
assertTrue(transactionManager.isTransactionV2Enabled());

prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, producerId, epoch);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this ensures that the epoch of when the begin complete transaction started was used right?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup exactly. The v1 transaction will have its original epoch (not bumped) and the next InitProducerId response will have the new epoch as expected.

* partitions from the previous transaction as already added to the new V2 transaction if the epoch is fenced.
*/

public synchronized void maybeUpdateTransactionV2Enabled(boolean onInitiatialization) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to understand the main ideas:

  • we want to handle cases when v1 producer is completing a transaction + while upgrading to v2
  • for the above case, we need to bump the epoch. otherwise, the next end txn will be fenced (v2 same epoch)

please correct me if i'm missing anything

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is mostly right, but want to clarify a few subtle things.
We want to handle cases where v1 is completing a transaction while upgrading to v2.

In this scenario, since the V1 transaction has the same epoch as the incoming v2 transaction, there are edge cases where we could miss adding partitions to the new V2 transaction while the v1 transaction was still completing. In the case where the partitions were exactly the same (or a subset) we fence the producer on EndTxn.

In another scenario, where we don't send an EndTxn request (say the producer dies) we would have transactional records on partitions that the coordinator doesn't know about (hanging transaction 😱 )

Both scenarios can be avoided by bumping the epoch after v1 commits. Specifically, the producer will get its new bumped epoch before it can even begin the v2 transaction. With the new epoch, it can differentiate between the partitions it added in the v1 transaction and the ones it will need to add for the v2 transaction.

@jeffkbkim jeffkbkim self-requested a review December 18, 2024 21:18
Copy link
Copy Markdown
Contributor

@jeffkbkim jeffkbkim left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. thanks!

@jolshan
Copy link
Copy Markdown
Member Author

jolshan commented Dec 19, 2024

Failing tests are known flakes and unrelated to this PR. I will go ahead and merge + pick to 4.0 branch.

@jolshan jolshan merged commit f7f3cff into apache:trunk Dec 19, 2024
jolshan added a commit that referenced this pull request Dec 19, 2024
…uring upgrade (#18176)

We want to bump the epoch if we are upgrading to TV2. Given that we already have code in place for this, I thought we could piggyback on the completing transaction epoch bump logic. For just initializing producers, I moved the check to the end of InitTransaction. Note, we have do do this check after we initialize the producer ID to ensure we have updated ApiVersions correctly.

Reviewers: Calvin Liu <caliu@confluent.io>, Artem Livshits <alivshits@confluent.io>, Jeff Kim <jeff.kim@confluent.io>
ijuma added a commit to ijuma/kafka that referenced this pull request Dec 19, 2024
…e-old-protocol-versions

* apache-github/trunk: (25 commits)
  KAFKA-18270: FindCoordinator v0 incorrectly tagged as deprecated (apache#18262)
  KAFKA-18284: Add group coordinator records for Streams rebalance protocol (apache#18228)
  MINOR: Fix flaky state updater test (apache#18253)
  MINOR: improve StreamsResetter logging (apache#18237)
  KAFKA-18227: Ensure v2 partitions are not added to last transaction during upgrade (apache#18176)
  Add IT for share consumer with duration base offet auto reset (apache#18251)
  KAFKA-18283: Add StreamsGroupDescribe RPC definitions (apache#18230)
  KAFKA-18241: add docs check to CI (apache#18183)
  KAFKA-18223 Improve flaky test report (apache#18212)
  MINOR Remove triage label in nightly job (apache#18147)
  KAFKA-18294 Remove deprecated SourceTask#commitRecord (apache#18260)
  KAFKA-18264 Remove NotLeaderForPartitionException (apache#18211)
  KAFKA-13722: Refactor SerdeGetter (apache#18242)
  KAFKA-18094 Remove deprecated TopicListing(String, Boolean) (apache#18248)
  KAFKA-18282: Add StreamsGroupHeartbeat RPC definitions (apache#18227)
  KAFKA-18026: KIP-1112 migrate KTableSuppressProcessorSupplier (apache#18150)
  KAFKA-18026: transition KTable#filter impl to use processor wrapper (apache#18205)
  KAFKA-18293 Remove `org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler` and `org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerValidatorCallbackHandler` (apache#18244)
  MINOR: add assertion about groupEpoch and targetAssignmentEpoch to testConsumerGroups (apache#18203)
  KAFKA-17960; PlaintextAdminIntegrationTest.testConsumerGroups fails with CONSUMER group protocol (apache#18234)
  ...
tedyu pushed a commit to tedyu/kafka that referenced this pull request Jan 6, 2025
…uring upgrade (apache#18176)

We want to bump the epoch if we are upgrading to TV2. Given that we already have code in place for this, I thought we could piggyback on the completing transaction epoch bump logic. For just initializing producers, I moved the check to the end of InitTransaction. Note, we have do do this check after we initialize the producer ID to ensure we have updated ApiVersions correctly.

Reviewers: Calvin Liu <caliu@confluent.io>, Artem Livshits <alivshits@confluent.io>, Jeff Kim <jeff.kim@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants