Skip to content

KAFKA-15468: Prevent transaction coordinator reloads on already loaded leaders#14489

Closed
jolshan wants to merge 5 commits intoapache:trunkfrom
jolshan:kafka-15468
Closed

KAFKA-15468: Prevent transaction coordinator reloads on already loaded leaders#14489
jolshan wants to merge 5 commits intoapache:trunkfrom
jolshan:kafka-15468

Conversation

@jolshan
Copy link
Copy Markdown
Member

@jolshan jolshan commented Oct 4, 2023

This PR has two parts:

  1. Redefining the TopicDelta fields to better distinguish when a leader is elected (leader epoch bump) vs when a leader has isr/replica changes (partition epoch bump). There are some cases where we bump the partition epoch but not the leader epoch. We do not need to do operations that only care about the leader epoch bump. (ie -- onElect callbacks)
  2. Even in the case of leader epoch bumps, we may be electing the same leader. In that case, we don't need to do some operations (load state from disk that is already loaded). The GroupCoordinator already handles this case, but the transaction coordinator does not. I've updated this code to not read from disk, but to take the existing metadata and update the leader epoch, as well as send markers with the new epoch.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

Comment thread core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala Outdated
@cmccabe cmccabe added the kraft label Oct 5, 2023
Copy link
Copy Markdown
Contributor

@clolov clolov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Besides some of the suggested cosmetic changes what you are proposing makes sense to me!

Comment thread metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java Outdated
Comment thread metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java
Comment thread core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala Outdated
Comment thread core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala Outdated
Comment thread core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala Outdated
Comment on lines +29 to +30
private final Map<TopicPartition, PartitionInfo> electedLeaders;
private final Map<TopicPartition, PartitionInfo> updatedLeaders;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to change these names to tpToPartitionEpochs and tpToLeaderEpochs? I can anticipate this naming being confusing for a person reading the code for the first time given that what classifies as each is defined in https://github.com/apache/kafka/pull/14489/files#diff-be8b1b8ad296c48bbdc3df55fdb859881f150ceadd0959ebf02fb3caac13ee5aR146-R151

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I struggled with naming this quite a bit 😅 . I was also wondering if I should make it clearer abou leader epoch changes vs partition epoch changes. The thing that is tricky is that the map only contains the leaders that experienced the changes (not followers) so I also wanted to make that clear. I will also think on that some more.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed changes for the simpler fixes and will continue to think on this one.

Copy link
Copy Markdown
Contributor

@clolov clolov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With or without a change to the leader naming I am happy with the implementation!

@jsancio jsancio self-assigned this Oct 12, 2023
@clolov
Copy link
Copy Markdown
Contributor

clolov commented Oct 19, 2023

Heya @jolshan! Is there something else I can help with in order for this pull request to make it in trunk?

Copy link
Copy Markdown
Member

@jsancio jsancio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @jolshan . LGTM outside some formatting issues

Comment thread metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java Outdated
Comment thread metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java Outdated
Comment thread metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java Outdated
@jsancio
Copy link
Copy Markdown
Member

jsancio commented Oct 25, 2023

@jolshan I started a new build.

@jsancio
Copy link
Copy Markdown
Member

jsancio commented Oct 27, 2023

@jolshan there are failing tests for this PR. Can you take a look when you have time?

@jolshan
Copy link
Copy Markdown
Member Author

jolshan commented Nov 1, 2023

I'm also waiting for some confirmation from @hachikuji about the transaction changes. I will look at the build issues in the meantime.

// left off during the unloading phase. Ensure we remove all associated state for this partition before we continue
// loading it.
// loading it. In the case where the state partition is already loaded, we want to remove inflight markers with the
// old epoch.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove inflight markers with the old epoch and replace them with the new epoch?

* metadata cache and for all the pending markers.
*/
def loadTransactionsForTxnTopicPartition(partitionId: Int, coordinatorEpoch: Int, sendTxnMarkers: SendTxnMarkersCallback): Unit = {
def maybeLoadTransactionsAndBumpEpochForTxnTopicPartition(partitionId: Int,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: The rename seems borderline overkill. I would consider the epoch bump part of transaction loading.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. 😅 I think I was trying to distinguish the difference between logical and physical loading. But maybe that is too specific. Do you think it should just keep the original name?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the original name seems fine to me. We are still loading the transactions. We just have an optimization when we already had state from a previous epoch.

txnManager.loadTransactionsForTxnTopicPartition(txnTopicPartitionId, coordinatorEpoch,
txnMarkerChannelManager.addTxnMarkersToSend)
txnManager.maybeLoadTransactionsAndBumpEpochForTxnTopicPartition(txnTopicPartitionId, coordinatorEpoch,
txnMarkerChannelManager.addTxnMarkersToSend, txnManager.txnStateLoaded(txnTopicPartitionId))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's curious that we need to pass the result of txnStateLoaded. Couldn't txnManager figure it out on its own?

s"$totalLoadingTimeMs milliseconds, of which $schedulerTimeMs milliseconds was spent in the scheduler.")
Some(loadedTransactions)
} else {
None
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we should have a log message in this path?

def maybeLoadTransactionsAndBumpEpochForTxnTopicPartition(partitionId: Int,
coordinatorEpoch: Int,
sendTxnMarkers: SendTxnMarkersCallback,
transactionStateLoaded: Boolean): Unit = {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned above, I don't think we should pass this as an argument.

On a higher level, I'm trying to figure out the safety of this loading process. Suppose we have two epoch bumps in quick succession. Do we get a strong ordering guarantee given that it is done asynchronously? I think I would expect that we would check for the existence of the partition in loadingPartitions when we first acquire the write lock below. If it exists, then we need to ensure the monotonicity of the epoch. If the entry has a higher epoch, then we ignore the call.

@jolshan
Copy link
Copy Markdown
Member Author

jolshan commented Jan 6, 2024

I've been asked to split these changes into the two parts I mentioned. Will follow up with that.

@jolshan
Copy link
Copy Markdown
Member Author

jolshan commented Jan 6, 2024

Part 1 PR here: #15139

jolshan added a commit that referenced this pull request Jan 23, 2024
… loaded leaders (#15139)

This originally was #14489 which covered 2 aspects -- reloading on partition epoch changes where leader epoch did not change and reloading when leader epoch changed but we were already the leader.

I've cut out the second part of the change since the first part is much simpler.

Redefining the TopicDelta fields to better distinguish when a leader is elected (leader epoch bump) vs when a leader has isr/replica changes (partition epoch bump). There are some cases where we bump the partition epoch but not the leader epoch. We do not need to do operations that only care about the leader epoch bump. (ie -- onElect callbacks)

Reviewers: Artem Livshits <alivshits@confluent.io>, José Armando García Sancio <jsancio@apache.org>
@jolshan jolshan marked this pull request as draft January 25, 2024 19:27
yyu1993 pushed a commit to yyu1993/kafka that referenced this pull request Feb 15, 2024
… loaded leaders (apache#15139)

This originally was apache#14489 which covered 2 aspects -- reloading on partition epoch changes where leader epoch did not change and reloading when leader epoch changed but we were already the leader.

I've cut out the second part of the change since the first part is much simpler.

Redefining the TopicDelta fields to better distinguish when a leader is elected (leader epoch bump) vs when a leader has isr/replica changes (partition epoch bump). There are some cases where we bump the partition epoch but not the leader epoch. We do not need to do operations that only care about the leader epoch bump. (ie -- onElect callbacks)

Reviewers: Artem Livshits <alivshits@confluent.io>, José Armando García Sancio <jsancio@apache.org>
clolov pushed a commit to clolov/kafka that referenced this pull request Apr 5, 2024
… loaded leaders (apache#15139)

This originally was apache#14489 which covered 2 aspects -- reloading on partition epoch changes where leader epoch did not change and reloading when leader epoch changed but we were already the leader.

I've cut out the second part of the change since the first part is much simpler.

Redefining the TopicDelta fields to better distinguish when a leader is elected (leader epoch bump) vs when a leader has isr/replica changes (partition epoch bump). There are some cases where we bump the partition epoch but not the leader epoch. We do not need to do operations that only care about the leader epoch bump. (ie -- onElect callbacks)

Reviewers: Artem Livshits <alivshits@confluent.io>, José Armando García Sancio <jsancio@apache.org>
@jsancio jsancio removed the kraft label Apr 7, 2024
@jsancio jsancio removed their assignment Apr 7, 2024
Phuc-Hong-Tran pushed a commit to Phuc-Hong-Tran/kafka that referenced this pull request Jun 6, 2024
… loaded leaders (apache#15139)

This originally was apache#14489 which covered 2 aspects -- reloading on partition epoch changes where leader epoch did not change and reloading when leader epoch changed but we were already the leader.

I've cut out the second part of the change since the first part is much simpler.

Redefining the TopicDelta fields to better distinguish when a leader is elected (leader epoch bump) vs when a leader has isr/replica changes (partition epoch bump). There are some cases where we bump the partition epoch but not the leader epoch. We do not need to do operations that only care about the leader epoch bump. (ie -- onElect callbacks)

Reviewers: Artem Livshits <alivshits@confluent.io>, José Armando García Sancio <jsancio@apache.org>
@github-actions
Copy link
Copy Markdown

github-actions Bot commented Aug 5, 2024

This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch)

If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed.

@github-actions github-actions Bot added the stale Stale PRs label Aug 5, 2024
@jolshan jolshan closed this Aug 9, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

stale Stale PRs

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants