Skip to content

KAFKA-9235; Ensure transaction coordinator resigns after replica deletion#7963

Merged
hachikuji merged 1 commit intoapache:trunkfrom
hachikuji:KAFKA-9235
Jan 16, 2020
Merged

KAFKA-9235; Ensure transaction coordinator resigns after replica deletion#7963
hachikuji merged 1 commit intoapache:trunkfrom
hachikuji:KAFKA-9235

Conversation

@hachikuji
Copy link
Copy Markdown
Contributor

During a reassignment, it can happen that the current leader of a partition is demoted and removed from the replica set at the same time. In this case, we rely on the StopReplica request in order to stop replica fetchers and to clear the group coordinator cache. This patch adds similar logic to ensure that the transaction coordinator state cache also gets cleared.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@hachikuji hachikuji changed the title KAFKA-9235; Ensure transaction coordinator is stopped after replica deletion KAFKA-9235; Ensure transaction coordinator resigns after replica deletion Jan 15, 2020
private[transaction] val loadingPartitions: mutable.Set[TransactionPartitionAndLeaderEpoch] = mutable.Set()

/** partitions of transaction topic that are being removed, state lock should be called BEFORE accessing this set */
private[transaction] val leavingPartitions: mutable.Set[TransactionPartitionAndLeaderEpoch] = mutable.Set()
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note I got rid of this because there didn't seem to be a strong reason to do transaction state unloading asynchronously (all we do is remove a key from a map).

assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))),
transactionManager.getTransactionState(transactionalId1))
assertEquals(Right(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1)),
transactionManager.putTransactionStateIfNotExists(transactionalId1, txnMetadata2))
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor test bug fix here.

Copy link
Copy Markdown
Contributor

@rajinisivaram rajinisivaram left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hachikuji Thanks for the PR, LGTM

@hachikuji
Copy link
Copy Markdown
Contributor Author

retest this please

@hachikuji hachikuji merged commit fbe2e60 into apache:trunk Jan 16, 2020
hachikuji added a commit that referenced this pull request Jan 16, 2020
…eletion (#7963)

During a reassignment, it can happen that the current leader of a partition is demoted and removed from the replica set at the same time. In this case, we rely on the StopReplica request in order to stop replica fetchers and to clear the group coordinator cache. This patch adds similar logic to ensure that the transaction coordinator state cache also gets cleared.

Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
ijuma added a commit to confluentinc/kafka that referenced this pull request Jan 21, 2020
Conflicts or compilation errors due to the fact that we temporarily
reverted the commit that removes Scala 2.11 support:

* AclCommand.scala: take upstream changes.
* AclCommandTest.scala: take upstream changes.
* TransactionCoordinatorTest.scala: don't use SAMs, but adjust
mock call to putTransactionStateIfNotExists given new signature.
* TransactionStateManagerTest: use Runnable instead of SAMs.
* PartitionLockTest: use Runnable instead of SAMs.
* docs/upgrade.html: take upstream changes excluding line that
states that Scala 2.11 support has been removed.

* apache-github/trunk: (28 commits)
  KAFKA-9457; Fix flaky test org.apache.kafka.common.network.SelectorTest.testGracefulClose (apache#7989)
  MINOR: Update AclCommand help message to match implementation (apache#7990)
  MINOR: Update introduction page in Kafka documentation
  MINOR: Use Math.min for StreamsPartitionAssignor#updateMinReceivedVersion method (apache#7954)
  KAFKA-9338; Fetch session should cache request leader epoch (apache#7970)
  KAFKA-9329; KafkaController::replicasAreValid should return error message (apache#7865)
  KAFKA-9449; Adds support for closing the producer's BufferPool. (apache#7967)
  MINOR: Handle expandIsr in PartitionLockTest and ensure read threads not blocked on write (apache#7973)
  MINOR: Fix typo in connect integration test class name (apache#7976)
  KAFKA-9218: MirrorMaker 2 can fail to create topics (apache#7745)
  KAFKA-8847; Deprecate and remove usage of supporting classes in kafka.security.auth (apache#7966)
  MINOR: Suppress DescribeConfigs Denied log during CreateTopics (apache#7971)
  [MINOR]: Fix typo in Fetcher comment (apache#7934)
  MINOR: Remove unnecessary call to `super` in `MetricConfig` constructor (apache#7975)
  MINOR: fix flaky StreamsUpgradeTestIntegrationTest (apache#7974)
  KAFKA-9431: Expose API in KafkaStreams to fetch all local offset lags (apache#7961)
  KAFKA-9235; Ensure transaction coordinator is stopped after replica deletion (apache#7963)
  KAFKA-9410; Make groupId Optional in KafkaConsumer (apache#7943)
  MINOR: Removed accidental double negation in error message. (apache#7834)
  KAFKA-6144: IQ option to query standbys (apache#7962)
  ...
qq619618919 pushed a commit to qq619618919/kafka that referenced this pull request May 12, 2020
…eletion (apache#7963)

During a reassignment, it can happen that the current leader of a partition is demoted and removed from the replica set at the same time. In this case, we rely on the StopReplica request in order to stop replica fetchers and to clear the group coordinator cache. This patch adds similar logic to ensure that the transaction coordinator state cache also gets cleared.

Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
Demogorgon314 pushed a commit to Demogorgon314/kop that referenced this pull request Apr 15, 2026
…native#631)

### Motivation

We found a lot of logs.

<img width="1716" alt="image"
src="https://github.com/user-attachments/assets/3edf6512-0406-4bf9-a9e4-ac9045899f6f">

We can find these
[logs](https://kibana.streamnative.cloud/goto/158f7e012dc597e91cdd0a23d6d57c38).
If the topic partition does not exist in the `transactionMetadataCache`,
the remove method will return a null object.

We can follow this [PR](apache/kafka#7963) to
improve the logic later.

### Modifications

Add NPE check for removed transaction metadata.
Demogorgon314 pushed a commit to Demogorgon314/kop that referenced this pull request Apr 15, 2026
Fixes streamnative/ksn#635 Fixes
streamnative/ksn#634

### Motivation

When there are multiple loading tasks, only the 1st task will be
executed. However, it will cause a race:
1. A partition is loaded
2. The partition is unloaded
3. The partition is loaded again, return the future of step 1
4. The I/O executor executes the task in step 1
5. The I/O executor executes the task in step 2

Eventually, the loading cache will be cleared in step 5.

### Modifications

Instead of just retaining the 1st pending task, just cancel the cached
task with the same key. It can ensure the I/O operation order to be
consistent with the order when they were scheduled.

Add `testRepeatedSubmittedReadTasks` to cover this change.

Besides, apply the fixed `FutureTaskCache` to
`TransactionMetadataManager` and follow the latest logic in
apache/kafka#7963
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants