From 25a1108a4c81015569c0b4c6bb7ec7014cae901f Mon Sep 17 00:00:00 2001 From: Ming-Yen Chung Date: Fri, 20 Feb 2026 15:31:55 +0800 Subject: [PATCH 01/10] KAFKA-19562: Replace hand-written AbortedTxn with generated protocol Replace the manually maintained AbortedTxn class with a generated protocol message defined in AbortedTxn.json. The generated class provides the same wire format (2-byte version prefix + 4 int64 fields) ensuring binary compatibility with existing transaction index files. --- .../resources/common/message/AbortedTxn.json | 35 ++++++ .../scala/unit/kafka/log/LogCleanerTest.scala | 7 +- .../scala/unit/kafka/log/LogLoaderTest.scala | 9 +- .../scala/unit/kafka/log/LogTestUtils.scala | 3 +- .../scala/unit/kafka/log/UnifiedLogTest.scala | 11 +- .../log/remote/storage/RemoteLogManager.java | 6 +- .../storage/internals/log/AbortedTxn.java | 117 ------------------ .../log/CleanedTransactionMetadata.java | 1 + .../kafka/storage/internals/log/Cleaner.java | 1 + .../kafka/storage/internals/log/LocalLog.java | 5 +- .../storage/internals/log/LogSegment.java | 7 +- .../internals/log/TransactionIndex.java | 39 +++--- .../internals/log/TxnIndexSearchResult.java | 2 + .../storage/internals/log/UnifiedLog.java | 1 + .../storage/internals/log/LogSegmentTest.java | 1 + .../internals/log/RemoteIndexCacheTest.java | 9 +- .../internals/log/TransactionIndexTest.java | 58 ++++----- .../apache/kafka/tools/DumpLogSegments.java | 4 +- 18 files changed, 128 insertions(+), 188 deletions(-) create mode 100644 clients/src/main/resources/common/message/AbortedTxn.json delete mode 100644 storage/src/main/java/org/apache/kafka/storage/internals/log/AbortedTxn.java diff --git a/clients/src/main/resources/common/message/AbortedTxn.json b/clients/src/main/resources/common/message/AbortedTxn.json new file mode 100644 index 0000000000000..d6b55de5cdffd --- /dev/null +++ b/clients/src/main/resources/common/message/AbortedTxn.json @@ -0,0 +1,35 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "type": "data", + "name": "AbortedTxn", + "validVersions": "0", + "flexibleVersions": "none", + "fields": [ + { "name": "ProducerId", "type": "int64", "versions": "0+", + "about": "The producer id associated with the aborted transaction" + }, + { "name": "FirstOffset", "type": "int64", "versions": "0+", + "about": "The first offset in the aborted transaction" + }, + { "name": "LastOffset", "type": "int64", "versions": "0+", + "about": "The last offset in the aborted transaction" + }, + { "name": "LastStableOffset", "type": "int64", "versions": "0+", + "about": "The last stable offset at the time the transaction was aborted" + } + ] +} diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala index 0485f36b0ab74..926aa14ddefb4 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala @@ -30,7 +30,8 @@ import org.apache.kafka.coordinator.transaction.TransactionLogConfig import org.apache.kafka.server.common.{RequestLocal, TransactionVersion} import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics} import org.apache.kafka.server.util.MockTime -import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, CleanedTransactionMetadata, Cleaner, CleanerConfig, CleanerStats, LocalLog, LogAppendInfo, LogCleaner, LogCleanerManager, LogCleaningAbortedException, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, LogToClean, OffsetMap, ProducerStateManager, ProducerStateManagerConfig, UnifiedLog, VerificationGuard} +import org.apache.kafka.common.message.AbortedTxn +import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanedTransactionMetadata, Cleaner, CleanerConfig, CleanerStats, LocalLog, LogAppendInfo, LogCleaner, LogCleanerManager, LogCleaningAbortedException, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, LogToClean, OffsetMap, ProducerStateManager, ProducerStateManagerConfig, UnifiedLog, VerificationGuard} import org.apache.kafka.storage.internals.utils.Throttler import org.apache.kafka.storage.log.metrics.BrokerTopicStats import org.junit.jupiter.api.Assertions._ @@ -417,8 +418,8 @@ class LogCleanerTest extends Logging { assertEquals(20L, log.logEndOffset) val expectedAbortedTxns = util.List.of( - new AbortedTxn(producerId1, 8, 10, 11), - new AbortedTxn(producerId2, 11, 16, 17) + new AbortedTxn().setProducerId(producerId1).setFirstOffset(8).setLastOffset(10).setLastStableOffset(11), + new AbortedTxn().setProducerId(producerId2).setFirstOffset(11).setLastOffset(16).setLastStableOffset(17) ) assertAllTransactionsComplete(log) diff --git a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala index c479ff1c17499..e65ba72a7a446 100644 --- a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala @@ -30,7 +30,8 @@ import org.apache.kafka.metadata.MockConfigRepository import org.apache.kafka.server.common.TransactionVersion import org.apache.kafka.server.util.{MockTime, Scheduler} import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache -import org.apache.kafka.storage.internals.log.{AbortedTxn, CleanerConfig, EpochEntry, LocalLog, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, LogOffsetMetadata, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, OffsetIndex, ProducerStateManager, ProducerStateManagerConfig, SnapshotFile, UnifiedLog} +import org.apache.kafka.common.message.AbortedTxn +import org.apache.kafka.storage.internals.log.{CleanerConfig, EpochEntry, LocalLog, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, LogOffsetMetadata, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, OffsetIndex, ProducerStateManager, ProducerStateManagerConfig, SnapshotFile, UnifiedLog} import org.apache.kafka.storage.internals.checkpoint.CleanShutdownFileHandler import org.apache.kafka.storage.log.metrics.BrokerTopicStats import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, assertEquals, assertFalse, assertNotEquals, assertThrows, assertTrue} @@ -1281,7 +1282,7 @@ class LogLoaderTest { val reloadedLogConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 5) val reloadedLog = createLog(logDir, reloadedLogConfig, lastShutdownClean = false) val abortedTransactions = LogTestUtils.allAbortedTransactions(reloadedLog) - assertEquals(List(new AbortedTxn(pid1, 0L, 29L, 8L), new AbortedTxn(pid2, 8L, 74L, 36L)), abortedTransactions) + assertEquals(List(new AbortedTxn().setProducerId(pid1).setFirstOffset(0L).setLastOffset(29L).setLastStableOffset(8L), new AbortedTxn().setProducerId(pid2).setFirstOffset(8L).setLastOffset(74L).setLastStableOffset(36L)), abortedTransactions) } @Test @@ -1332,7 +1333,7 @@ class LogLoaderTest { val reloadedLogConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 5) val reloadedLog = createLog(logDir, reloadedLogConfig, recoveryPoint = recoveryPoint, lastShutdownClean = false) val abortedTransactions = LogTestUtils.allAbortedTransactions(reloadedLog) - assertEquals(List(new AbortedTxn(pid1, 0L, 29L, 8L), new AbortedTxn(pid2, 8L, 74L, 36L)), abortedTransactions) + assertEquals(List(new AbortedTxn().setProducerId(pid1).setFirstOffset(0L).setLastOffset(29L).setLastStableOffset(8L), new AbortedTxn().setProducerId(pid2).setFirstOffset(8L).setLastOffset(74L).setLastStableOffset(36L)), abortedTransactions) } @Test @@ -1386,7 +1387,7 @@ class LogLoaderTest { val reloadedLogConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 5) val reloadedLog = createLog(logDir, reloadedLogConfig, recoveryPoint = recoveryPoint, lastShutdownClean = false) val abortedTransactions = LogTestUtils.allAbortedTransactions(reloadedLog) - assertEquals(List(new AbortedTxn(pid1, 0L, 29L, 8L), new AbortedTxn(pid2, 8L, 74L, 36L)), abortedTransactions) + assertEquals(List(new AbortedTxn().setProducerId(pid1).setFirstOffset(0L).setLastOffset(29L).setLastStableOffset(8L), new AbortedTxn().setProducerId(pid2).setFirstOffset(8L).setLastOffset(74L).setLastStableOffset(36L)), abortedTransactions) } @Test diff --git a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala index 5b4fc1d41e4dc..9875a3b2ce441 100644 --- a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala +++ b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala @@ -37,7 +37,8 @@ import org.apache.kafka.server.log.remote.storage.RemoteLogManager import org.apache.kafka.server.storage.log.FetchIsolation import org.apache.kafka.server.util.Scheduler import org.apache.kafka.storage.internals.log.LogConfig.{DEFAULT_REMOTE_LOG_COPY_DISABLE_CONFIG, DEFAULT_REMOTE_LOG_DELETE_ON_DISABLE_CONFIG} -import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, FetchDataInfo, LazyIndex, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetsListener, LogSegment, ProducerStateManager, ProducerStateManagerConfig, TransactionIndex, VerificationGuard, UnifiedLog} +import org.apache.kafka.common.message.AbortedTxn +import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, LazyIndex, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetsListener, LogSegment, ProducerStateManager, ProducerStateManagerConfig, TransactionIndex, VerificationGuard, UnifiedLog} import org.apache.kafka.storage.log.metrics.BrokerTopicStats import scala.jdk.CollectionConverters._ diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala index c54f587d452ae..2aca86a52f441 100755 --- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala +++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala @@ -42,7 +42,8 @@ import org.apache.kafka.server.storage.log.{FetchIsolation, UnexpectedAppendOffs import org.apache.kafka.server.util.{KafkaScheduler, MockTime, Scheduler} import org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpointFile, PartitionMetadataFile} import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache -import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, AsyncOffsetReader, Cleaner, EpochEntry, LogConfig, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, LogToClean, OffsetResultHolder, OffsetsOutOfOrderException, ProducerStateManager, ProducerStateManagerConfig, RecordValidationException, UnifiedLog, VerificationGuard} +import org.apache.kafka.common.message.AbortedTxn +import org.apache.kafka.storage.internals.log.{AppendOrigin, AsyncOffsetReader, Cleaner, EpochEntry, LogConfig, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, LogToClean, OffsetResultHolder, OffsetsOutOfOrderException, ProducerStateManager, ProducerStateManagerConfig, RecordValidationException, UnifiedLog, VerificationGuard} import org.apache.kafka.storage.internals.utils.Throttler import org.apache.kafka.storage.log.metrics.{BrokerTopicMetrics, BrokerTopicStats} import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, _} @@ -3112,8 +3113,8 @@ class UnifiedLogTest { val abortedTransactions = LogTestUtils.allAbortedTransactions(log) val expectedTransactions = List( - new AbortedTxn(pid1, 0L, 29L, 8L), - new AbortedTxn(pid2, 8L, 74L, 36L) + new AbortedTxn().setProducerId(pid1).setFirstOffset(0L).setLastOffset(29L).setLastStableOffset(8L), + new AbortedTxn().setProducerId(pid2).setFirstOffset(8L).setLastOffset(74L).setLastStableOffset(36L) ) assertEquals(expectedTransactions, abortedTransactions) @@ -3171,8 +3172,8 @@ class UnifiedLogTest { val abortedTransactions = LogTestUtils.allAbortedTransactions(log) val expectedTransactions = List( - new AbortedTxn(pid1, 0L, 29L, 8L), - new AbortedTxn(pid2, 8L, 74L, 36L) + new AbortedTxn().setProducerId(pid1).setFirstOffset(0L).setLastOffset(29L).setLastStableOffset(8L), + new AbortedTxn().setProducerId(pid2).setFirstOffset(8L).setLastOffset(74L).setLastStableOffset(36L) ) assertEquals(expectedTransactions, abortedTransactions) diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java index 6e25147eb7c70..945c274df1310 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java @@ -25,6 +25,7 @@ import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.common.internals.Plugin; import org.apache.kafka.common.internals.SecurityManagerCompatibility; +import org.apache.kafka.common.message.AbortedTxn; import org.apache.kafka.common.message.FetchResponseData; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Quota; @@ -60,7 +61,6 @@ import org.apache.kafka.server.storage.log.FetchIsolation; import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile; import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache; -import org.apache.kafka.storage.internals.log.AbortedTxn; import org.apache.kafka.storage.internals.log.AsyncOffsetReadFutureHolder; import org.apache.kafka.storage.internals.log.AsyncOffsetReader; import org.apache.kafka.storage.internals.log.EpochEntry; @@ -1833,7 +1833,9 @@ private FetchDataInfo addAbortedTransactions(long startOffset, Consumer> accumulator = abortedTxns -> abortedTransactions.addAll(abortedTxns.stream() - .map(AbortedTxn::asAbortedTransaction).toList()); + .map(txn -> new FetchResponseData.AbortedTransaction() + .setProducerId(txn.producerId()) + .setFirstOffset(txn.firstOffset())).toList()); long startTimeNs = time.nanoseconds(); collectAbortedTransactions(startOffset, upperBoundOffset, segmentMetadata, accumulator, log); diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/AbortedTxn.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/AbortedTxn.java deleted file mode 100644 index 05217d97ba132..0000000000000 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/AbortedTxn.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.storage.internals.log; - -import org.apache.kafka.common.message.FetchResponseData; - -import java.nio.ByteBuffer; -import java.util.Objects; - -public class AbortedTxn { - static final int VERSION_OFFSET = 0; - static final int VERSION_SIZE = 2; - static final int PRODUCER_ID_OFFSET = VERSION_OFFSET + VERSION_SIZE; - static final int PRODUCER_ID_SIZE = 8; - static final int FIRST_OFFSET_OFFSET = PRODUCER_ID_OFFSET + PRODUCER_ID_SIZE; - static final int FIRST_OFFSET_SIZE = 8; - static final int LAST_OFFSET_OFFSET = FIRST_OFFSET_OFFSET + FIRST_OFFSET_SIZE; - static final int LAST_OFFSET_SIZE = 8; - static final int LAST_STABLE_OFFSET_OFFSET = LAST_OFFSET_OFFSET + LAST_OFFSET_SIZE; - static final int LAST_STABLE_OFFSET_SIZE = 8; - static final int TOTAL_SIZE = LAST_STABLE_OFFSET_OFFSET + LAST_STABLE_OFFSET_SIZE; - - public static final short CURRENT_VERSION = 0; - - final ByteBuffer buffer; - - AbortedTxn(ByteBuffer buffer) { - Objects.requireNonNull(buffer); - this.buffer = buffer; - } - - public AbortedTxn(CompletedTxn completedTxn, long lastStableOffset) { - this(completedTxn.producerId(), completedTxn.firstOffset(), completedTxn.lastOffset(), lastStableOffset); - } - - public AbortedTxn(long producerId, long firstOffset, long lastOffset, long lastStableOffset) { - this(toByteBuffer(producerId, firstOffset, lastOffset, lastStableOffset)); - } - - private static ByteBuffer toByteBuffer(long producerId, long firstOffset, long lastOffset, long lastStableOffset) { - ByteBuffer buffer = ByteBuffer.allocate(TOTAL_SIZE); - buffer.putShort(CURRENT_VERSION); - buffer.putLong(producerId); - buffer.putLong(firstOffset); - buffer.putLong(lastOffset); - buffer.putLong(lastStableOffset); - buffer.flip(); - return buffer; - } - - public short version() { - return buffer.get(VERSION_OFFSET); - } - - public long producerId() { - return buffer.getLong(PRODUCER_ID_OFFSET); - } - - public long firstOffset() { - return buffer.getLong(FIRST_OFFSET_OFFSET); - } - - public long lastOffset() { - return buffer.getLong(LAST_OFFSET_OFFSET); - } - - public long lastStableOffset() { - return buffer.getLong(LAST_STABLE_OFFSET_OFFSET); - } - - public FetchResponseData.AbortedTransaction asAbortedTransaction() { - return new FetchResponseData.AbortedTransaction() - .setProducerId(producerId()) - .setFirstOffset(firstOffset()); - } - - @Override - public boolean equals(Object o) { - if (this == o) - return true; - if (o == null || getClass() != o.getClass()) - return false; - - AbortedTxn that = (AbortedTxn) o; - return buffer.equals(that.buffer); - } - - @Override - public int hashCode() { - return buffer.hashCode(); - } - - @Override - public String toString() { - return "AbortedTxn(version=" + version() - + ", producerId=" + producerId() - + ", firstOffset=" + firstOffset() - + ", lastOffset=" + lastOffset() - + ", lastStableOffset=" + lastStableOffset() - + ")"; - } - -} diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/CleanedTransactionMetadata.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/CleanedTransactionMetadata.java index 6c4224aa2cdc9..e076f7d4f3db1 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/CleanedTransactionMetadata.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/CleanedTransactionMetadata.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.storage.internals.log; +import org.apache.kafka.common.message.AbortedTxn; import org.apache.kafka.common.record.internal.ControlRecordType; import org.apache.kafka.common.record.internal.Record; import org.apache.kafka.common.record.internal.RecordBatch; diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/Cleaner.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/Cleaner.java index 47d4d8fbb39b4..de7c8893abdf9 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/Cleaner.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/Cleaner.java @@ -19,6 +19,7 @@ import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.CorruptRecordException; +import org.apache.kafka.common.message.AbortedTxn; import org.apache.kafka.common.record.internal.FileRecords; import org.apache.kafka.common.record.internal.MemoryRecords; import org.apache.kafka.common.record.internal.MutableRecordBatch; diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LocalLog.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LocalLog.java index b8ecc886ab099..786673db1dda9 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LocalLog.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LocalLog.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.KafkaStorageException; import org.apache.kafka.common.errors.OffsetOutOfRangeException; +import org.apache.kafka.common.message.AbortedTxn; import org.apache.kafka.common.message.FetchResponseData; import org.apache.kafka.common.record.internal.FileLogInputStream; import org.apache.kafka.common.record.internal.FileRecords; @@ -540,7 +541,9 @@ FetchDataInfo addAbortedTransactions(long startOffset, LogSegment segment, Fetch List abortedTransactions = new ArrayList<>(); Consumer> accumulator = abortedTxns -> { for (AbortedTxn abortedTxn : abortedTxns) - abortedTransactions.add(abortedTxn.asAbortedTransaction()); + abortedTransactions.add(new FetchResponseData.AbortedTransaction() + .setProducerId(abortedTxn.producerId()) + .setFirstOffset(abortedTxn.firstOffset())); }; collectAbortedTransactions(startOffset, upperBoundOffset, segment, accumulator); return new FetchDataInfo(fetchInfo.fetchOffsetMetadata, diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java index 4d9769e3e103f..d461a705de56e 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java @@ -18,6 +18,7 @@ import org.apache.kafka.common.InvalidRecordException; import org.apache.kafka.common.errors.CorruptRecordException; +import org.apache.kafka.common.message.AbortedTxn; import org.apache.kafka.common.record.internal.FileLogInputStream.FileChannelRecordBatch; import org.apache.kafka.common.record.internal.FileRecords; import org.apache.kafka.common.record.internal.FileRecords.LogOffsetPosition; @@ -348,7 +349,11 @@ public int appendFromFile(FileRecords records, int start) throws IOException { public void updateTxnIndex(CompletedTxn completedTxn, long lastStableOffset) throws IOException { if (completedTxn.isAborted()) { LOGGER.trace("Writing aborted transaction {} to transaction index, last stable offset is {}", completedTxn, lastStableOffset); - txnIndex.append(new AbortedTxn(completedTxn, lastStableOffset)); + txnIndex.append(new AbortedTxn() + .setProducerId(completedTxn.producerId()) + .setFirstOffset(completedTxn.firstOffset()) + .setLastOffset(completedTxn.lastOffset()) + .setLastStableOffset(lastStableOffset)); } } diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java index 6283012c165fd..815f41c04e3e5 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java @@ -17,6 +17,9 @@ package org.apache.kafka.storage.internals.log; import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.message.AbortedTxn; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.MessageUtil; import org.apache.kafka.common.utils.PrimitiveRef; import org.apache.kafka.common.utils.Utils; @@ -32,7 +35,6 @@ import java.util.List; import java.util.Optional; import java.util.OptionalLong; -import java.util.function.Supplier; /** * The transaction index maintains metadata about the aborted transactions for each segment. This includes @@ -47,6 +49,9 @@ */ public class TransactionIndex implements Closeable { + private static final int ABORTED_TXN_RECORD_SIZE = + MessageUtil.toVersionPrefixedByteBuffer(AbortedTxn.HIGHEST_SUPPORTED_VERSION, new AbortedTxn()).remaining(); + private record AbortedTxnWithPosition(AbortedTxn txn, int position) { } @@ -82,7 +87,8 @@ public void append(AbortedTxn abortedTxn) throws IOException { + file.getAbsolutePath()); }); lastOffset = OptionalLong.of(abortedTxn.lastOffset()); - Utils.writeFully(channel(), abortedTxn.buffer.duplicate()); + ByteBuffer buffer = MessageUtil.toVersionPrefixedByteBuffer(AbortedTxn.HIGHEST_SUPPORTED_VERSION, abortedTxn); + Utils.writeFully(channel(), buffer); } public void flush() throws IOException { @@ -130,13 +136,11 @@ public void renameTo(File f) throws IOException { } public void truncateTo(long offset) throws IOException { - ByteBuffer buffer = ByteBuffer.allocate(AbortedTxn.TOTAL_SIZE); OptionalLong newLastOffset = OptionalLong.empty(); - for (AbortedTxnWithPosition txnWithPosition : iterable(() -> buffer)) { + for (AbortedTxnWithPosition txnWithPosition : iterable()) { AbortedTxn abortedTxn = txnWithPosition.txn; - long position = txnWithPosition.position; if (abortedTxn.lastOffset() >= offset) { - channel().truncate(position); + channel().truncate(txnWithPosition.position); lastOffset = newLastOffset; return; } @@ -178,8 +182,7 @@ public TxnIndexSearchResult collectAbortedTxns(long fetchOffset, long upperBound * @throws CorruptIndexException if any problems are found. */ public void sanityCheck() { - ByteBuffer buffer = ByteBuffer.allocate(AbortedTxn.TOTAL_SIZE); - for (AbortedTxnWithPosition txnWithPosition : iterable(() -> buffer)) { + for (AbortedTxnWithPosition txnWithPosition : iterable()) { AbortedTxn abortedTxn = txnWithPosition.txn; if (abortedTxn.lastOffset() < startOffset) throw new CorruptIndexException("Last offset of aborted transaction " + abortedTxn + " in index " @@ -216,14 +219,11 @@ private FileChannel channelOrNull() { } private Iterable iterable() { - return iterable(() -> ByteBuffer.allocate(AbortedTxn.TOTAL_SIZE)); - } - - private Iterable iterable(Supplier allocate) { FileChannel channel = channelOrNull(); if (channel == null) return List.of(); + ByteBuffer buffer = ByteBuffer.allocate(ABORTED_TXN_RECORD_SIZE); PrimitiveRef.IntRef position = PrimitiveRef.ofInt(0); return () -> new Iterator<>() { @@ -231,7 +231,7 @@ private Iterable iterable(Supplier allocate) @Override public boolean hasNext() { try { - return channel.position() - position.value >= AbortedTxn.TOTAL_SIZE; + return channel.position() - position.value >= ABORTED_TXN_RECORD_SIZE; } catch (IOException e) { throw new KafkaException("Failed read position from the transaction index " + file.getAbsolutePath(), e); } @@ -240,17 +240,18 @@ public boolean hasNext() { @Override public AbortedTxnWithPosition next() { try { - ByteBuffer buffer = allocate.get(); + buffer.clear(); Utils.readFully(channel, buffer, position.value); buffer.flip(); - AbortedTxn abortedTxn = new AbortedTxn(buffer); - if (abortedTxn.version() > AbortedTxn.CURRENT_VERSION) - throw new KafkaException("Unexpected aborted transaction version " + abortedTxn.version() + short version = buffer.getShort(); + if (version > AbortedTxn.HIGHEST_SUPPORTED_VERSION) + throw new KafkaException("Unexpected aborted transaction version " + version + " in transaction index " + file.getAbsolutePath() + ", current version is " - + AbortedTxn.CURRENT_VERSION); + + AbortedTxn.HIGHEST_SUPPORTED_VERSION); + AbortedTxn abortedTxn = new AbortedTxn(new ByteBufferAccessor(buffer), version); AbortedTxnWithPosition nextEntry = new AbortedTxnWithPosition(abortedTxn, position.value); - position.value += AbortedTxn.TOTAL_SIZE; + position.value += ABORTED_TXN_RECORD_SIZE; return nextEntry; } catch (IOException e) { // We received an unexpected error reading from the index file. We propagate this as an diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/TxnIndexSearchResult.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/TxnIndexSearchResult.java index 6f9a425ea1ab9..ef1b14fff6fc4 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/TxnIndexSearchResult.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/TxnIndexSearchResult.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.storage.internals.log; +import org.apache.kafka.common.message.AbortedTxn; + import java.util.Collections; import java.util.List; diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java index 3bac4c3b9d842..ae403869ad4ca 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java @@ -30,6 +30,7 @@ import org.apache.kafka.common.errors.RecordBatchTooLargeException; import org.apache.kafka.common.errors.RecordTooLargeException; import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.message.AbortedTxn; import org.apache.kafka.common.message.DescribeProducersResponseData; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.record.internal.CompressionType; diff --git a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java index 166032da1f573..9af969fe82de5 100644 --- a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java +++ b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java @@ -19,6 +19,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.compress.Compression; import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.message.AbortedTxn; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.record.internal.ControlRecordType; import org.apache.kafka.common.record.internal.EndTransactionMarker; diff --git a/storage/src/test/java/org/apache/kafka/storage/internals/log/RemoteIndexCacheTest.java b/storage/src/test/java/org/apache/kafka/storage/internals/log/RemoteIndexCacheTest.java index d8d3a7f58226c..dc9c07839c96b 100644 --- a/storage/src/test/java/org/apache/kafka/storage/internals/log/RemoteIndexCacheTest.java +++ b/storage/src/test/java/org/apache/kafka/storage/internals/log/RemoteIndexCacheTest.java @@ -19,6 +19,7 @@ import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.AbortedTxn; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; @@ -1189,10 +1190,10 @@ private void createCorruptTxnIndexForSegmentMetadata(File dir, RemoteLogSegmentM txnIdxFile.createNewFile(); TransactionIndex txnIndex = new TransactionIndex(metadata.startOffset(), txnIdxFile); List abortedTxns = List.of( - new AbortedTxn(0L, 0, 10, 11), - new AbortedTxn(1L, 5, 15, 13), - new AbortedTxn(2L, 18, 35, 25), - new AbortedTxn(3L, 32, 50, 40)); + new AbortedTxn().setProducerId(0L).setFirstOffset(0).setLastOffset(10).setLastStableOffset(11), + new AbortedTxn().setProducerId(1L).setFirstOffset(5).setLastOffset(15).setLastStableOffset(13), + new AbortedTxn().setProducerId(2L).setFirstOffset(18).setLastOffset(35).setLastStableOffset(25), + new AbortedTxn().setProducerId(3L).setFirstOffset(32).setLastOffset(50).setLastStableOffset(40)); for (AbortedTxn abortedTxn : abortedTxns) { txnIndex.append(abortedTxn); } diff --git a/storage/src/test/java/org/apache/kafka/storage/internals/log/TransactionIndexTest.java b/storage/src/test/java/org/apache/kafka/storage/internals/log/TransactionIndexTest.java index c25c880cdf054..96199d7840ec3 100644 --- a/storage/src/test/java/org/apache/kafka/storage/internals/log/TransactionIndexTest.java +++ b/storage/src/test/java/org/apache/kafka/storage/internals/log/TransactionIndexTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.storage.internals.log; +import org.apache.kafka.common.message.AbortedTxn; import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.AfterEach; @@ -45,15 +46,15 @@ public void teardown() throws IOException { @Test public void testPositionSetCorrectlyWhenOpened() throws IOException { List abortedTxns = new ArrayList<>(List.of( - new AbortedTxn(0L, 0, 10, 11), - new AbortedTxn(1L, 5, 15, 13), - new AbortedTxn(2L, 18, 35, 25), - new AbortedTxn(3L, 32, 50, 40))); + new AbortedTxn().setProducerId(0L).setFirstOffset(0).setLastOffset(10).setLastStableOffset(11), + new AbortedTxn().setProducerId(1L).setFirstOffset(5).setLastOffset(15).setLastStableOffset(13), + new AbortedTxn().setProducerId(2L).setFirstOffset(18).setLastOffset(35).setLastStableOffset(25), + new AbortedTxn().setProducerId(3L).setFirstOffset(32).setLastOffset(50).setLastStableOffset(40))); abortedTxns.forEach(txn -> assertDoesNotThrow(() -> index.append(txn))); index.close(); TransactionIndex reopenedIndex = new TransactionIndex(0L, file); - AbortedTxn anotherAbortedTxn = new AbortedTxn(3L, 50, 60, 55); + AbortedTxn anotherAbortedTxn = new AbortedTxn().setProducerId(3L).setFirstOffset(50).setLastOffset(60).setLastStableOffset(55); reopenedIndex.append(anotherAbortedTxn); abortedTxns.add(anotherAbortedTxn); assertEquals(abortedTxns, reopenedIndex.allAbortedTxns()); @@ -62,10 +63,10 @@ public void testPositionSetCorrectlyWhenOpened() throws IOException { @Test public void testSanityCheck() throws IOException { List abortedTxns = List.of( - new AbortedTxn(0L, 0, 10, 11), - new AbortedTxn(1L, 5, 15, 13), - new AbortedTxn(2L, 18, 35, 25), - new AbortedTxn(3L, 32, 50, 40)); + new AbortedTxn().setProducerId(0L).setFirstOffset(0).setLastOffset(10).setLastStableOffset(11), + new AbortedTxn().setProducerId(1L).setFirstOffset(5).setLastOffset(15).setLastStableOffset(13), + new AbortedTxn().setProducerId(2L).setFirstOffset(18).setLastOffset(35).setLastStableOffset(25), + new AbortedTxn().setProducerId(3L).setFirstOffset(32).setLastOffset(50).setLastStableOffset(40)); abortedTxns.forEach(txn -> assertDoesNotThrow(() -> index.append(txn))); index.close(); @@ -77,25 +78,25 @@ public void testSanityCheck() throws IOException { @Test public void testLastOffsetMustIncrease() throws IOException { - index.append(new AbortedTxn(1L, 5, 15, 13)); - assertThrows(IllegalArgumentException.class, () -> index.append(new AbortedTxn(0L, 0, - 15, 11))); + index.append(new AbortedTxn().setProducerId(1L).setFirstOffset(5).setLastOffset(15).setLastStableOffset(13)); + assertThrows(IllegalArgumentException.class, () -> index.append(new AbortedTxn().setProducerId(0L).setFirstOffset(0) + .setLastOffset(15).setLastStableOffset(11))); } @Test public void testLastOffsetCannotDecrease() throws IOException { - index.append(new AbortedTxn(1L, 5, 15, 13)); - assertThrows(IllegalArgumentException.class, () -> index.append(new AbortedTxn(0L, 0, - 10, 11))); + index.append(new AbortedTxn().setProducerId(1L).setFirstOffset(5).setLastOffset(15).setLastStableOffset(13)); + assertThrows(IllegalArgumentException.class, () -> index.append(new AbortedTxn().setProducerId(0L).setFirstOffset(0) + .setLastOffset(10).setLastStableOffset(11))); } @Test public void testCollectAbortedTransactions() { List abortedTransactions = List.of( - new AbortedTxn(0L, 0, 10, 11), - new AbortedTxn(1L, 5, 15, 13), - new AbortedTxn(2L, 18, 35, 25), - new AbortedTxn(3L, 32, 50, 40)); + new AbortedTxn().setProducerId(0L).setFirstOffset(0).setLastOffset(10).setLastStableOffset(11), + new AbortedTxn().setProducerId(1L).setFirstOffset(5).setLastOffset(15).setLastStableOffset(13), + new AbortedTxn().setProducerId(2L).setFirstOffset(18).setLastOffset(35).setLastStableOffset(25), + new AbortedTxn().setProducerId(3L).setFirstOffset(32).setLastOffset(50).setLastStableOffset(40)); abortedTransactions.forEach(txn -> assertDoesNotThrow(() -> index.append(txn))); @@ -127,10 +128,10 @@ public void testCollectAbortedTransactions() { @Test public void testTruncate() throws IOException { List abortedTransactions = List.of( - new AbortedTxn(0L, 0, 10, 2), - new AbortedTxn(1L, 5, 15, 16), - new AbortedTxn(2L, 18, 35, 25), - new AbortedTxn(3L, 32, 50, 40)); + new AbortedTxn().setProducerId(0L).setFirstOffset(0).setLastOffset(10).setLastStableOffset(2), + new AbortedTxn().setProducerId(1L).setFirstOffset(5).setLastOffset(15).setLastStableOffset(16), + new AbortedTxn().setProducerId(2L).setFirstOffset(18).setLastOffset(35).setLastStableOffset(25), + new AbortedTxn().setProducerId(3L).setFirstOffset(32).setLastOffset(50).setLastStableOffset(40)); abortedTransactions.forEach(txn -> assertDoesNotThrow(() -> index.append(txn))); @@ -151,8 +152,7 @@ public void testAbortedTxnSerde() { long lastOffset = 299L; long lastStableOffset = 200L; - AbortedTxn abortedTxn = new AbortedTxn(pid, firstOffset, lastOffset, lastStableOffset); - assertEquals(AbortedTxn.CURRENT_VERSION, abortedTxn.version()); + AbortedTxn abortedTxn = new AbortedTxn().setProducerId(pid).setFirstOffset(firstOffset).setLastOffset(lastOffset).setLastStableOffset(lastStableOffset); assertEquals(pid, abortedTxn.producerId()); assertEquals(firstOffset, abortedTxn.firstOffset()); assertEquals(lastOffset, abortedTxn.lastOffset()); @@ -162,10 +162,10 @@ public void testAbortedTxnSerde() { @Test public void testRenameIndex() throws IOException { File renamed = TestUtils.tempFile(); - index.append(new AbortedTxn(0L, 0, 10, 2)); + index.append(new AbortedTxn().setProducerId(0L).setFirstOffset(0).setLastOffset(10).setLastStableOffset(2)); index.renameTo(renamed); - index.append(new AbortedTxn(1L, 5, 15, 16)); + index.append(new AbortedTxn().setProducerId(1L).setFirstOffset(5).setLastOffset(15).setLastStableOffset(16)); List abortedTxns = index.collectAbortedTxns(0L, 100L).abortedTransactions(); assertEquals(2, abortedTxns.size()); @@ -188,7 +188,7 @@ public void testFlush() throws IOException { assertTrue(nonExistentFile.delete()); try (TransactionIndex testIndex = new TransactionIndex(0, nonExistentFile)) { testIndex.flush(); - testIndex.append(new AbortedTxn(0L, 0, 10, 2)); + testIndex.append(new AbortedTxn().setProducerId(0L).setFirstOffset(0).setLastOffset(10).setLastStableOffset(2)); testIndex.flush(); assertNotEquals(0, testIndex.file().length()); } @@ -217,7 +217,7 @@ public void testIsEmptyWhenFileIsEmpty() { @Test public void testIsEmptyWhenFileIsNotEmpty() throws IOException { - index.append(new AbortedTxn(0L, 0, 10, 2)); + index.append(new AbortedTxn().setProducerId(0L).setFirstOffset(0).setLastOffset(10).setLastStableOffset(2)); assertFalse(index.isEmpty()); } } diff --git a/tools/src/main/java/org/apache/kafka/tools/DumpLogSegments.java b/tools/src/main/java/org/apache/kafka/tools/DumpLogSegments.java index 69e5cf091ba05..1227477f36912 100644 --- a/tools/src/main/java/org/apache/kafka/tools/DumpLogSegments.java +++ b/tools/src/main/java/org/apache/kafka/tools/DumpLogSegments.java @@ -18,6 +18,7 @@ import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.message.AbortedTxn; import org.apache.kafka.common.message.ConsumerProtocolAssignment; import org.apache.kafka.common.message.ConsumerProtocolAssignmentJsonConverter; import org.apache.kafka.common.message.ConsumerProtocolSubscription; @@ -60,7 +61,6 @@ import org.apache.kafka.server.util.CommandLineUtils; import org.apache.kafka.snapshot.SnapshotPath; import org.apache.kafka.snapshot.Snapshots; -import org.apache.kafka.storage.internals.log.AbortedTxn; import org.apache.kafka.storage.internals.log.BatchMetadata; import org.apache.kafka.storage.internals.log.CorruptSnapshotException; import org.apache.kafka.storage.internals.log.LogFileUtils; @@ -151,7 +151,7 @@ public static void main(String[] args) throws IOException { private static void dumpTxnIndex(File file) throws IOException { try (TransactionIndex index = new TransactionIndex(UnifiedLog.offsetFromFile(file), file)) { for (AbortedTxn abortedTxn : index.allAbortedTxns()) { - System.out.println("version: " + abortedTxn.version() + + System.out.println("version: " + AbortedTxn.HIGHEST_SUPPORTED_VERSION + " producerId: " + abortedTxn.producerId() + " firstOffset: " + abortedTxn.firstOffset() + " lastOffset: " + abortedTxn.lastOffset() + From d9e6deaf12e5a604a75429f6f8663916c8fc964c Mon Sep 17 00:00:00 2001 From: Ming-Yen Chung Date: Tue, 3 Mar 2026 21:08:06 +0800 Subject: [PATCH 02/10] Also validate LOWEST_SUPPORTED_VERSION when reading transaction index --- .../kafka/storage/internals/log/TransactionIndex.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java index 815f41c04e3e5..7c5d9d4a238f7 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java @@ -245,10 +245,10 @@ public AbortedTxnWithPosition next() { buffer.flip(); short version = buffer.getShort(); - if (version > AbortedTxn.HIGHEST_SUPPORTED_VERSION) + if (version < AbortedTxn.LOWEST_SUPPORTED_VERSION || version > AbortedTxn.HIGHEST_SUPPORTED_VERSION) throw new KafkaException("Unexpected aborted transaction version " + version - + " in transaction index " + file.getAbsolutePath() + ", current version is " - + AbortedTxn.HIGHEST_SUPPORTED_VERSION); + + " in transaction index " + file.getAbsolutePath() + ", supported version range is " + + AbortedTxn.LOWEST_SUPPORTED_VERSION + " to " + AbortedTxn.HIGHEST_SUPPORTED_VERSION); AbortedTxn abortedTxn = new AbortedTxn(new ByteBufferAccessor(buffer), version); AbortedTxnWithPosition nextEntry = new AbortedTxnWithPosition(abortedTxn, position.value); position.value += ABORTED_TXN_RECORD_SIZE; From de0cebd20a75cbc688c6332981454c791f8b761e Mon Sep 17 00:00:00 2001 From: Ming-Yen Chung Date: Tue, 3 Mar 2026 21:16:25 +0800 Subject: [PATCH 03/10] Add unit test to verify binary compatibility with hand-written AbortedTxn --- .../internals/log/TransactionIndexTest.java | 29 +++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/storage/src/test/java/org/apache/kafka/storage/internals/log/TransactionIndexTest.java b/storage/src/test/java/org/apache/kafka/storage/internals/log/TransactionIndexTest.java index 96199d7840ec3..9a87687b76d12 100644 --- a/storage/src/test/java/org/apache/kafka/storage/internals/log/TransactionIndexTest.java +++ b/storage/src/test/java/org/apache/kafka/storage/internals/log/TransactionIndexTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.storage.internals.log; import org.apache.kafka.common.message.AbortedTxn; +import org.apache.kafka.common.protocol.MessageUtil; import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.AfterEach; @@ -24,6 +25,7 @@ import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; @@ -220,4 +222,31 @@ public void testIsEmptyWhenFileIsNotEmpty() throws IOException { index.append(new AbortedTxn().setProducerId(0L).setFirstOffset(0).setLastOffset(10).setLastStableOffset(2)); assertFalse(index.isEmpty()); } + + @Test + public void testBinaryCompatibilityWithHandWrittenClass() { + long producerId = 983493L; + long firstOffset = 137L; + long lastOffset = 299L; + long lastStableOffset = 200L; + + // Build the expected binary using the same layout as the old hand-written AbortedTxn + ByteBuffer expected = ByteBuffer.allocate(34); + expected.putShort((short) 0); // version + expected.putLong(producerId); + expected.putLong(firstOffset); + expected.putLong(lastOffset); + expected.putLong(lastStableOffset); + expected.flip(); + + // Serialize using the generated class + AbortedTxn abortedTxn = new AbortedTxn() + .setProducerId(producerId) + .setFirstOffset(firstOffset) + .setLastOffset(lastOffset) + .setLastStableOffset(lastStableOffset); + ByteBuffer actual = MessageUtil.toVersionPrefixedByteBuffer(AbortedTxn.HIGHEST_SUPPORTED_VERSION, abortedTxn); + + assertEquals(expected, actual); + } } From 1a7b2a563745460f0550b3b15de341530397674d Mon Sep 17 00:00:00 2001 From: Ming-Yen Chung Date: Fri, 6 Mar 2026 08:22:10 +0800 Subject: [PATCH 04/10] Add comment about AbortedTxn version handling --- .../apache/kafka/storage/internals/log/TransactionIndex.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java index 7c5d9d4a238f7..c9e5a135ff9bf 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java @@ -49,6 +49,8 @@ */ public class TransactionIndex implements Closeable { + // Note: if new fields are added to AbortedTxn, this code may need to be changed to read the + // version bytes first for each record and then determine the record body size based on the version. private static final int ABORTED_TXN_RECORD_SIZE = MessageUtil.toVersionPrefixedByteBuffer(AbortedTxn.HIGHEST_SUPPORTED_VERSION, new AbortedTxn()).remaining(); From c665018e965b02c94a125ff463694e3cc8150b6d Mon Sep 17 00:00:00 2001 From: Ming-Yen Chung Date: Fri, 6 Mar 2026 08:43:50 +0800 Subject: [PATCH 05/10] Add note in AbortedTxn.json about TransactionIndex compatibility --- clients/src/main/resources/common/message/AbortedTxn.json | 1 + 1 file changed, 1 insertion(+) diff --git a/clients/src/main/resources/common/message/AbortedTxn.json b/clients/src/main/resources/common/message/AbortedTxn.json index d6b55de5cdffd..bcd84dac95387 100644 --- a/clients/src/main/resources/common/message/AbortedTxn.json +++ b/clients/src/main/resources/common/message/AbortedTxn.json @@ -31,5 +31,6 @@ { "name": "LastStableOffset", "type": "int64", "versions": "0+", "about": "The last stable offset at the time the transaction was aborted" } + // Note: adding new fields may require TransactionIndex to be refactored to read version-per-record. ] } From dcd21e6be40ec6710f781f251dede2e1ab9fbfd6 Mon Sep 17 00:00:00 2001 From: Ming-Yen Chung Date: Fri, 6 Mar 2026 16:18:36 +0800 Subject: [PATCH 06/10] Fix iterable() to return independent iterators Move buffer and position allocation inside the iterator() lambda so each call to iterator() gets its own independent state, properly satisfying the Iterable contract. Add a test to verify this behavior. --- .../internals/log/TransactionIndex.java | 67 ++++++++++--------- .../internals/log/TransactionIndexTest.java | 35 ++++++++++ 2 files changed, 69 insertions(+), 33 deletions(-) diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java index c9e5a135ff9bf..9f1057c7228cc 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java @@ -225,43 +225,44 @@ private Iterable iterable() { if (channel == null) return List.of(); - ByteBuffer buffer = ByteBuffer.allocate(ABORTED_TXN_RECORD_SIZE); - PrimitiveRef.IntRef position = PrimitiveRef.ofInt(0); - - return () -> new Iterator<>() { - - @Override - public boolean hasNext() { - try { - return channel.position() - position.value >= ABORTED_TXN_RECORD_SIZE; - } catch (IOException e) { - throw new KafkaException("Failed read position from the transaction index " + file.getAbsolutePath(), e); + return () -> { + ByteBuffer buffer = ByteBuffer.allocate(ABORTED_TXN_RECORD_SIZE); + PrimitiveRef.IntRef position = PrimitiveRef.ofInt(0); + return new Iterator<>() { + + @Override + public boolean hasNext() { + try { + return channel.position() - position.value >= ABORTED_TXN_RECORD_SIZE; + } catch (IOException e) { + throw new KafkaException("Failed read position from the transaction index " + file.getAbsolutePath(), e); + } } - } - @Override - public AbortedTxnWithPosition next() { - try { - buffer.clear(); - Utils.readFully(channel, buffer, position.value); - buffer.flip(); - - short version = buffer.getShort(); - if (version < AbortedTxn.LOWEST_SUPPORTED_VERSION || version > AbortedTxn.HIGHEST_SUPPORTED_VERSION) - throw new KafkaException("Unexpected aborted transaction version " + version - + " in transaction index " + file.getAbsolutePath() + ", supported version range is " - + AbortedTxn.LOWEST_SUPPORTED_VERSION + " to " + AbortedTxn.HIGHEST_SUPPORTED_VERSION); - AbortedTxn abortedTxn = new AbortedTxn(new ByteBufferAccessor(buffer), version); - AbortedTxnWithPosition nextEntry = new AbortedTxnWithPosition(abortedTxn, position.value); - position.value += ABORTED_TXN_RECORD_SIZE; - return nextEntry; - } catch (IOException e) { - // We received an unexpected error reading from the index file. We propagate this as an - // UNKNOWN error to the consumer, which will cause it to retry the fetch. - throw new KafkaException("Failed to read from the transaction index " + file.getAbsolutePath(), e); + @Override + public AbortedTxnWithPosition next() { + try { + buffer.clear(); + Utils.readFully(channel, buffer, position.value); + buffer.flip(); + + short version = buffer.getShort(); + if (version < AbortedTxn.LOWEST_SUPPORTED_VERSION || version > AbortedTxn.HIGHEST_SUPPORTED_VERSION) + throw new KafkaException("Unexpected aborted transaction version " + version + + " in transaction index " + file.getAbsolutePath() + ", supported version range is " + + AbortedTxn.LOWEST_SUPPORTED_VERSION + " to " + AbortedTxn.HIGHEST_SUPPORTED_VERSION); + AbortedTxn abortedTxn = new AbortedTxn(new ByteBufferAccessor(buffer), version); + AbortedTxnWithPosition nextEntry = new AbortedTxnWithPosition(abortedTxn, position.value); + position.value += ABORTED_TXN_RECORD_SIZE; + return nextEntry; + } catch (IOException e) { + // We received an unexpected error reading from the index file. We propagate this as an + // UNKNOWN error to the consumer, which will cause it to retry the fetch. + throw new KafkaException("Failed to read from the transaction index " + file.getAbsolutePath(), e); + } } - } + }; }; } diff --git a/storage/src/test/java/org/apache/kafka/storage/internals/log/TransactionIndexTest.java b/storage/src/test/java/org/apache/kafka/storage/internals/log/TransactionIndexTest.java index 9a87687b76d12..803122ac4568e 100644 --- a/storage/src/test/java/org/apache/kafka/storage/internals/log/TransactionIndexTest.java +++ b/storage/src/test/java/org/apache/kafka/storage/internals/log/TransactionIndexTest.java @@ -25,8 +25,10 @@ import java.io.File; import java.io.IOException; +import java.lang.reflect.Method; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; @@ -223,6 +225,39 @@ public void testIsEmptyWhenFileIsNotEmpty() throws IOException { assertFalse(index.isEmpty()); } + @Test + @SuppressWarnings("unchecked") + public void testIterableReturnsIndependentIterators() throws Exception { + List abortedTxns = List.of( + new AbortedTxn().setProducerId(0L).setFirstOffset(0).setLastOffset(10).setLastStableOffset(2), + new AbortedTxn().setProducerId(1L).setFirstOffset(11).setLastOffset(20).setLastStableOffset(15), + new AbortedTxn().setProducerId(2L).setFirstOffset(21).setLastOffset(30).setLastStableOffset(25)); + abortedTxns.forEach(txn -> assertDoesNotThrow(() -> index.append(txn))); + + Method iterableMethod = TransactionIndex.class.getDeclaredMethod("iterable"); + iterableMethod.setAccessible(true); + Iterable iterable = (Iterable) iterableMethod.invoke(index); + + Iterator iter1 = iterable.iterator(); + Iterator iter2 = iterable.iterator(); + + // Exhaust iter1 + int count1 = 0; + while (iter1.hasNext()) { + iter1.next(); + count1++; + } + assertEquals(3, count1); + + // iter2 must be independent — still readable from the beginning + int count2 = 0; + while (iter2.hasNext()) { + iter2.next(); + count2++; + } + assertEquals(3, count2); + } + @Test public void testBinaryCompatibilityWithHandWrittenClass() { long producerId = 983493L; From 70176766c55684bd82046eb1564536e469f83d67 Mon Sep 17 00:00:00 2001 From: Ming-Yen Chung Date: Mon, 9 Mar 2026 19:21:06 +0800 Subject: [PATCH 07/10] Simplify iterator state by using anonymous class fields instead of PrimitiveRef --- .../internals/log/TransactionIndex.java | 68 +++++++++---------- 1 file changed, 33 insertions(+), 35 deletions(-) diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java index 9f1057c7228cc..c4e93aca1867d 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java @@ -20,7 +20,7 @@ import org.apache.kafka.common.message.AbortedTxn; import org.apache.kafka.common.protocol.ByteBufferAccessor; import org.apache.kafka.common.protocol.MessageUtil; -import org.apache.kafka.common.utils.PrimitiveRef; + import org.apache.kafka.common.utils.Utils; import java.io.Closeable; @@ -225,44 +225,42 @@ private Iterable iterable() { if (channel == null) return List.of(); - return () -> { - ByteBuffer buffer = ByteBuffer.allocate(ABORTED_TXN_RECORD_SIZE); - PrimitiveRef.IntRef position = PrimitiveRef.ofInt(0); - return new Iterator<>() { - - @Override - public boolean hasNext() { - try { - return channel.position() - position.value >= ABORTED_TXN_RECORD_SIZE; - } catch (IOException e) { - throw new KafkaException("Failed read position from the transaction index " + file.getAbsolutePath(), e); - } + return () -> new Iterator<>() { + private final ByteBuffer buffer = ByteBuffer.allocate(ABORTED_TXN_RECORD_SIZE); + private int position = 0; + + @Override + public boolean hasNext() { + try { + return channel.position() - position >= ABORTED_TXN_RECORD_SIZE; + } catch (IOException e) { + throw new KafkaException("Failed read position from the transaction index " + file.getAbsolutePath(), e); } + } - @Override - public AbortedTxnWithPosition next() { - try { - buffer.clear(); - Utils.readFully(channel, buffer, position.value); - buffer.flip(); - - short version = buffer.getShort(); - if (version < AbortedTxn.LOWEST_SUPPORTED_VERSION || version > AbortedTxn.HIGHEST_SUPPORTED_VERSION) - throw new KafkaException("Unexpected aborted transaction version " + version - + " in transaction index " + file.getAbsolutePath() + ", supported version range is " - + AbortedTxn.LOWEST_SUPPORTED_VERSION + " to " + AbortedTxn.HIGHEST_SUPPORTED_VERSION); - AbortedTxn abortedTxn = new AbortedTxn(new ByteBufferAccessor(buffer), version); - AbortedTxnWithPosition nextEntry = new AbortedTxnWithPosition(abortedTxn, position.value); - position.value += ABORTED_TXN_RECORD_SIZE; - return nextEntry; - } catch (IOException e) { - // We received an unexpected error reading from the index file. We propagate this as an - // UNKNOWN error to the consumer, which will cause it to retry the fetch. - throw new KafkaException("Failed to read from the transaction index " + file.getAbsolutePath(), e); - } + @Override + public AbortedTxnWithPosition next() { + try { + buffer.clear(); + Utils.readFully(channel, buffer, position); + buffer.flip(); + + short version = buffer.getShort(); + if (version < AbortedTxn.LOWEST_SUPPORTED_VERSION || version > AbortedTxn.HIGHEST_SUPPORTED_VERSION) + throw new KafkaException("Unexpected aborted transaction version " + version + + " in transaction index " + file.getAbsolutePath() + ", supported version range is " + + AbortedTxn.LOWEST_SUPPORTED_VERSION + " to " + AbortedTxn.HIGHEST_SUPPORTED_VERSION); + AbortedTxn abortedTxn = new AbortedTxn(new ByteBufferAccessor(buffer), version); + AbortedTxnWithPosition nextEntry = new AbortedTxnWithPosition(abortedTxn, position); + position += ABORTED_TXN_RECORD_SIZE; + return nextEntry; + } catch (IOException e) { + // We received an unexpected error reading from the index file. We propagate this as an + // UNKNOWN error to the consumer, which will cause it to retry the fetch. + throw new KafkaException("Failed to read from the transaction index " + file.getAbsolutePath(), e); } + } - }; }; } From b9f0ef6180084f4e95372c83cde3f7a883971706 Mon Sep 17 00:00:00 2001 From: Ming-Yen Chung Date: Tue, 10 Mar 2026 17:38:24 +0800 Subject: [PATCH 08/10] Fix spotless and compile errors after trunk merge - Remove blank line between MessageUtil and Utils imports in TransactionIndex.java to satisfy spotless - Add missing AbortedTxn import in UnifiedLogTest.scala --- core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala | 1 + .../org/apache/kafka/storage/internals/log/TransactionIndex.java | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala index 05e845b6c1348..c3fb08085e3c8 100755 --- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala +++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala @@ -24,6 +24,7 @@ import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.{InvalidRecordException, TopicPartition, Uuid} import org.apache.kafka.common.errors._ import org.apache.kafka.common.internals.Topic +import org.apache.kafka.common.message.AbortedTxn import org.apache.kafka.common.message.FetchResponseData import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.record.internal.FileRecords.TimestampAndOffset diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java index c4e93aca1867d..f2eb074926f24 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java @@ -20,7 +20,6 @@ import org.apache.kafka.common.message.AbortedTxn; import org.apache.kafka.common.protocol.ByteBufferAccessor; import org.apache.kafka.common.protocol.MessageUtil; - import org.apache.kafka.common.utils.Utils; import java.io.Closeable; From f28d457926b7bb96bc1af2af1f83c263f9128826 Mon Sep 17 00:00:00 2001 From: Ming-Yen Chung Date: Fri, 20 Mar 2026 08:06:00 +0800 Subject: [PATCH 09/10] Fix import ordering in UnifiedLogTest --- .../org/apache/kafka/storage/internals/log/UnifiedLogTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java index 320aace0569a9..f382fe803a336 100644 --- a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java +++ b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.storage.internals.log; -import org.apache.kafka.common.message.AbortedTxn; import org.apache.kafka.common.InvalidRecordException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; @@ -31,6 +30,7 @@ import org.apache.kafka.common.errors.OutOfOrderSequenceException; import org.apache.kafka.common.errors.RecordBatchTooLargeException; import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.message.AbortedTxn; import org.apache.kafka.common.message.DescribeProducersResponseData; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.record.TimestampType; From bdd9099d502c368738f15b8f7d2c10005d1f4254 Mon Sep 17 00:00:00 2001 From: Ming-Yen Chung Date: Mon, 23 Mar 2026 10:23:22 +0800 Subject: [PATCH 10/10] Fix DumpLogSegmentsTest to use generated AbortedTxn protocol --- .../java/org/apache/kafka/tools/DumpLogSegmentsTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tools/src/test/java/org/apache/kafka/tools/DumpLogSegmentsTest.java b/tools/src/test/java/org/apache/kafka/tools/DumpLogSegmentsTest.java index 5b403d523528d..9221f02df5a95 100644 --- a/tools/src/test/java/org/apache/kafka/tools/DumpLogSegmentsTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/DumpLogSegmentsTest.java @@ -28,6 +28,7 @@ import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.TopicConfig; import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.message.AbortedTxn; import org.apache.kafka.common.message.KRaftVersionRecord; import org.apache.kafka.common.message.LeaderChangeMessage; import org.apache.kafka.common.message.SnapshotFooterRecord; @@ -85,7 +86,6 @@ import org.apache.kafka.server.storage.log.FetchIsolation; import org.apache.kafka.server.util.MockTime; import org.apache.kafka.snapshot.RecordsSnapshotWriter; -import org.apache.kafka.storage.internals.log.AbortedTxn; import org.apache.kafka.storage.internals.log.AppendOrigin; import org.apache.kafka.storage.internals.log.FetchDataInfo; import org.apache.kafka.storage.internals.log.LogConfig; @@ -1486,8 +1486,8 @@ public void testLegacyRecordBatchOutputFormat() throws Exception { public void testDumpTxnIndex() throws Exception { File txnIndexFile = new File(logDir, segmentName + ".txnindex"); try (TransactionIndex index = new TransactionIndex(0L, txnIndexFile)) { - index.append(new AbortedTxn(1L, 0, 10, 11)); - index.append(new AbortedTxn(2L, 15, 25, 26)); + index.append(new AbortedTxn().setProducerId(1L).setFirstOffset(0).setLastOffset(10).setLastStableOffset(11)); + index.append(new AbortedTxn().setProducerId(2L).setFirstOffset(15).setLastOffset(25).setLastStableOffset(26)); index.flush(); }