Skip to content

KAFKA-19999 Transaction coordinator livelock caused by invalid producer epoch#21176

Merged
chia7712 merged 10 commits intoapache:trunkfrom
m1a2st:KAFKA-19999
Dec 28, 2025
Merged

KAFKA-19999 Transaction coordinator livelock caused by invalid producer epoch#21176
chia7712 merged 10 commits intoapache:trunkfrom
m1a2st:KAFKA-19999

Conversation

@m1a2st
Copy link
Copy Markdown
Collaborator

@m1a2st m1a2st commented Dec 19, 2025

In Transaction Version 2, strict epoch validation (markerEpoch > currentEpoch) causes hanging transactions in two scenarios:

  1. Coordinator recovery: When reloading PREPARE_COMMIT/ABORT from
    transaction log, retried markers are rejected with
    InvalidProducerEpochException because they use the same epoch
  2. Network retry: When marker write succeeds but response is lost,
    coordinator retries are rejected for the same reason

Both cases leave transactions permanently hanging in PREPARE state,
causing clients to fail with CONCURRENT_TRANSACTIONS.

Detect idempotent marker retries in
ProducerStateManager.checkProducerEpoch() by checking three
conditions:

  1. Transaction Version ≥ 2
  2. markerEpoch == currentEpoch (same epoch)
  3. currentTxnFirstOffset is empty (transaction already completed)

When all conditions are met, treat the marker as a successful idempotent
retry instead of throwing an error.

Reviewers: Justine Olshan jolshan@confluent.io, TaiJuWu
tjwu1217@gmail.com, Chia-Ping Tsai chia7712@gmail.com

@github-actions github-actions bot added triage PRs from the community core Kafka Broker storage Pull requests that target the storage module clients labels Dec 19, 2025
@m1a2st m1a2st changed the title KAFKA-19999 Transaction coordinator livelock caused by invalid producer epoch KAFKA-19999 Transaction coordinator livelock caused by invalid producer epoch [WIP] Dec 19, 2025
@m1a2st
Copy link
Copy Markdown
Collaborator Author

m1a2st commented Dec 19, 2025

Test the tests/kafkatest/tests/core/transactions_upgrade_test.py::TransactionsUpgradeTest.test_transactions_upgrade test all pass.

test_id: kafkatest.tests.core.transactions_upgrade_test.TransactionsUpgradeTest.test_transactions_upgrade.from_kafka_version=3.3.2.metadata_quorum=ISOLATED_KRAFT.group_protocol=None
status: PASS
run time: 6 minutes 47.209 seconds

test_id: kafkatest.tests.core.transactions_upgrade_test.TransactionsUpgradeTest.test_transactions_upgrade.from_kafka_version=4.1.1.metadata_quorum=ISOLATED_KRAFT.group_protocol=None
status: PASS
run time: 8 minutes 3.934 seconds

test_id: kafkatest.tests.core.transactions_upgrade_test.TransactionsUpgradeTest.test_transactions_upgrade.from_kafka_version=3.4.1.metadata_quorum=ISOLATED_KRAFT.group_protocol=None
status: PASS
run time: 7 minutes 44.717 seconds

test_id: kafkatest.tests.core.transactions_upgrade_test.TransactionsUpgradeTest.test_transactions_upgrade.from_kafka_version=3.5.2.metadata_quorum=ISOLATED_KRAFT.group_protocol=None
status: PASS
run time: 7 minutes 1.438 seconds

test_id: kafkatest.tests.core.transactions_upgrade_test.TransactionsUpgradeTest.test_transactions_upgrade.from_kafka_version=3.6.2.metadata_quorum=ISOLATED_KRAFT.group_protocol=None
status: PASS
run time: 6 minutes 44.217 seconds

test_id: kafkatest.tests.core.transactions_upgrade_test.TransactionsUpgradeTest.test_transactions_upgrade.from_kafka_version=3.7.2.metadata_quorum=ISOLATED_KRAFT.group_protocol=None
status: PASS
run time: 7 minutes 44.566 seconds

test_id: kafkatest.tests.core.transactions_upgrade_test.TransactionsUpgradeTest.test_transactions_upgrade.from_kafka_version=3.9.1.metadata_quorum=ISOLATED_KRAFT.group_protocol=None
status: PASS
run time: 7 minutes 49.853 seconds

test_id: kafkatest.tests.core.transactions_upgrade_test.TransactionsUpgradeTest.test_transactions_upgrade.from_kafka_version=3.8.1.metadata_quorum=ISOLATED_KRAFT.group_protocol=None
status: PASS
run time: 7 minutes 11.152 seconds

test_id: kafkatest.tests.core.transactions_upgrade_test.TransactionsUpgradeTest.test_transactions_upgrade.from_kafka_version=4.0.1.metadata_quorum=ISOLATED_KRAFT.group_protocol=None
status: PASS
run time: 7 minutes 43.566 seconds

@chia7712 chia7712 requested a review from satishd December 19, 2025 17:25
Copy link
Copy Markdown
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@m1a2st thanks for this quick fix.

Comment thread core/src/main/scala/kafka/server/KafkaApis.scala Outdated
Comment thread clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java Outdated
Comment thread core/src/main/scala/kafka/server/KafkaApis.scala Outdated
@chia7712 chia7712 requested review from jolshan and removed request for satishd December 20, 2025 01:54
@github-actions github-actions bot removed the triage PRs from the community label Dec 20, 2025
@m1a2st m1a2st changed the title KAFKA-19999 Transaction coordinator livelock caused by invalid producer epoch [WIP] KAFKA-19999 Transaction coordinator livelock caused by invalid producer epoch Dec 20, 2025
Errors.NOT_LEADER_OR_FOLLOWER
case error =>
error
if (IdempotentTransactionMarkerException.isInstanceOf(exception))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please also add comments

Comment thread clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java Outdated
@m1a2st
Copy link
Copy Markdown
Collaborator Author

m1a2st commented Dec 21, 2025

Thanks for @chia7712 review, I have addressed all comments

log.debug("Received duplicate transaction marker for producer {} with epoch {} " +
"but transaction is no longer ongoing, treating as idempotent success",
producerId, producerEpoch);
throw new IdempotentTransactionMarkerException();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm -- I'm not sure how I feel about throwing this error here. Why did we choose to throw an error?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally we would try to avoid an error thrown and control flow checking errors in a case where there is no error.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that using exception for control is an anti-pattern, especially for success scenarios.

However, the write path for the offset topic appears to lack built-in idempotency logic, so avoiding the exception approach might require more refactoring. Regardless, I agree it is worth the effort to implement this without relying on exceptions

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we simply return early from ProducerAppendInfo#checkProducerEpoch, we still need to prevent duplicate data from being written to the log. The validation happens before the actual append operation, so just returning allows the code to continue and write the duplicate marker anyway.

The general produce path can handle duplicate writes more easily through conditional logic after validation. However, the write path for the offset topic appears to lack built-in idempotency logic. Therefore, throwing an exception provides a clean way to immediately break the append flow once we detect an idempotent retry. The exception propagates up through the call stack, preventing the duplicate write regardless of which code path is handling the marker. The caller can then catch this specific exception type and treat it as a successful operation.

While using exceptions for control flow is not ideal, it's a pragmatic solution given the current code structure.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid using exceptions for control flow, an alternative approach is to simply suppress the InvalidProducerEpochException when the "safe" conditions are met (i.e., the transaction is completed, the protocol is v2, and the producer epochs are identical)

While this results in appending a duplicate marker to the log, it should be acceptable as this scenario is rare and the record size is negligible. Furthermore, both the broker and clients can handle such duplicate markers gracefully

@m1a2st @jolshan WDYT?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is ok to write a duplicate marker as long as we have the log lock and can't write anything else (ie no transaction will start)

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While this results in appending a duplicate marker to the log, it should be acceptable as this scenario is rare and the record size is negligible. Furthermore, both the broker and clients can handle such duplicate markers gracefully

Agreed. Suppressing the exception and allowing the duplicate marker is simpler and more pragmatic. The duplicate marker is harmless and this scenario is rare enough that the tradeoff is worth it for cleaner code.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome, the code is also looking a lot cleaner. 👍

this(error, baseOffset, logAppendTime, logStartOffset, recordErrors, errorMessage, new ProduceResponseData.LeaderIdAndEpoch());
}

public PartitionResponse(Throwable exception, long baseOffset, long logAppendTime, long logStartOffset, List<RecordError> recordErrors, String errorMessage) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What was the rationale for the changes in this file?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it is for passing up the error back to the KafkaApis, it is also not ideal as the handling for this exception starts leaking into the client code.

@m1a2st
Copy link
Copy Markdown
Collaborator Author

m1a2st commented Dec 24, 2025

Thanks for @chia7712 and @jolshan review, I test tests/kafkatest/tests/core/transactions_upgrade_test.py::TransactionsUpgradeTest.test_transactions_upgrade and all testcases pass.

test_id:    kafkatest.tests.core.transactions_upgrade_test.TransactionsUpgradeTest.test_transactions_upgrade.from_kafka_version=3.3.2.metadata_quorum=ISOLATED_KRAFT.group_protocol=None
status:     PASS
run time:   6 minutes 58.652 seconds
----------------------------------------------------------------------------------------------------
test_id:    kafkatest.tests.core.transactions_upgrade_test.TransactionsUpgradeTest.test_transactions_upgrade.from_kafka_version=4.1.1.metadata_quorum=ISOLATED_KRAFT.group_protocol=None
status:     PASS
run time:   7 minutes 3.323 seconds
----------------------------------------------------------------------------------------------------
test_id:    kafkatest.tests.core.transactions_upgrade_test.TransactionsUpgradeTest.test_transactions_upgrade.from_kafka_version=3.4.1.metadata_quorum=ISOLATED_KRAFT.group_protocol=None
status:     PASS
run time:   7 minutes 3.930 seconds
----------------------------------------------------------------------------------------------------
test_id:    kafkatest.tests.core.transactions_upgrade_test.TransactionsUpgradeTest.test_transactions_upgrade.from_kafka_version=3.5.2.metadata_quorum=ISOLATED_KRAFT.group_protocol=None
status:     PASS
run time:   8 minutes 4.982 seconds
----------------------------------------------------------------------------------------------------
test_id:    kafkatest.tests.core.transactions_upgrade_test.TransactionsUpgradeTest.test_transactions_upgrade.from_kafka_version=3.6.2.metadata_quorum=ISOLATED_KRAFT.group_protocol=None
status:     PASS
run time:   7 minutes 1.914 seconds
----------------------------------------------------------------------------------------------------
test_id:    kafkatest.tests.core.transactions_upgrade_test.TransactionsUpgradeTest.test_transactions_upgrade.from_kafka_version=3.7.2.metadata_quorum=ISOLATED_KRAFT.group_protocol=None
status:     PASS
run time:   7 minutes 3.991 seconds
----------------------------------------------------------------------------------------------------
test_id:    kafkatest.tests.core.transactions_upgrade_test.TransactionsUpgradeTest.test_transactions_upgrade.from_kafka_version=3.8.1.metadata_quorum=ISOLATED_KRAFT.group_protocol=None
status:     PASS
run time:   7 minutes 55.735 seconds
----------------------------------------------------------------------------------------------------
test_id:    kafkatest.tests.core.transactions_upgrade_test.TransactionsUpgradeTest.test_transactions_upgrade.from_kafka_version=3.9.1.metadata_quorum=ISOLATED_KRAFT.group_protocol=None
status:     PASS
run time:   7 minutes 48.164 seconds
----------------------------------------------------------------------------------------------------
test_id:    kafkatest.tests.core.transactions_upgrade_test.TransactionsUpgradeTest.test_transactions_upgrade.from_kafka_version=4.0.1.metadata_quorum=ISOLATED_KRAFT.group_protocol=None
status:     PASS
run time:   7 minutes 22.213 seconds
----------------------------------------------------------------------------------------------------

assertEquals(OptionalLong.of(99L), stateManager.firstUndecidedOffset());

short markerEpoch = (short) (epoch + 1);
appendEndTxnMarker(stateManager, producerId, markerEpoch, ControlRecordType.COMMIT, 100, transactionVersion);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

small question -- why do we do this append in two different ways in the test? It seems like the behavior is mostly the same in this helper vs the code at 1106. It's just the code at 1106 doesn't do three of the steps at the end:

private void appendEndTxnMarker(ProducerStateManager stateManager,
                                    long producerId,
                                    short producerEpoch,
                                    ControlRecordType controlType,
                                    long offset,
                                    int coordinatorEpoch,
                                    long timestamp,
                                    short transactionVersion) {
        ProducerAppendInfo producerAppendInfo = stateManager.prepareUpdate(producerId, AppendOrigin.COORDINATOR, time.milliseconds());
        EndTransactionMarker endTxnMarker = new EndTransactionMarker(controlType, coordinatorEpoch);
        Optional<CompletedTxn> completedTxn = producerAppendInfo.appendEndTxnMarker(endTxnMarker, producerEpoch, offset, timestamp, transactionVersion);
        stateManager.update(producerAppendInfo);
        completedTxn.ifPresent(stateManager::completeTxn);
        stateManager.updateMapEndOffset(offset + 1);
    }

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing out! Updated to use the helper method consistently for both appends. This ensures the full cleanup steps are executed and makes the test clearer.

Copy link
Copy Markdown
Collaborator

@TaiJuWu TaiJuWu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@chia7712 chia7712 merged commit f0488eb into apache:trunk Dec 28, 2025
26 checks passed
chia7712 pushed a commit that referenced this pull request Dec 28, 2025
…er epoch (#21176)

In Transaction Version 2, strict epoch validation (`markerEpoch >
currentEpoch`) causes hanging transactions in two scenarios:

1. **Coordinator recovery**: When reloading PREPARE_COMMIT/ABORT from
transaction log, retried markers are rejected with
`InvalidProducerEpochException` because they use the same epoch
2. **Network retry**: When marker write succeeds but response is lost,
coordinator retries are rejected for the same reason

Both cases leave transactions permanently hanging in PREPARE state,
causing clients to fail with `CONCURRENT_TRANSACTIONS`.

Detect idempotent marker retries in
`ProducerStateManager.checkProducerEpoch()` by checking three
conditions:
1. Transaction Version ≥ 2
2. markerEpoch == currentEpoch (same epoch)
3. currentTxnFirstOffset is empty (transaction already completed)

When all conditions are met, treat the marker as a successful idempotent
retry instead of throwing an error.

Reviewers: Justine Olshan <jolshan@confluent.io>, TaiJuWu
 <tjwu1217@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
@chia7712
Copy link
Copy Markdown
Member

@m1a2st thanks for this crucial fix

@jolshan thanks for reviews

I have merged it to trunk and 4.2

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

clients core Kafka Broker small Small PRs storage Pull requests that target the storage module

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants