diff --git a/clients/src/main/resources/common/message/AbortedTxn.json b/clients/src/main/resources/common/message/AbortedTxn.json new file mode 100644 index 0000000000000..bcd84dac95387 --- /dev/null +++ b/clients/src/main/resources/common/message/AbortedTxn.json @@ -0,0 +1,36 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "type": "data", + "name": "AbortedTxn", + "validVersions": "0", + "flexibleVersions": "none", + "fields": [ + { "name": "ProducerId", "type": "int64", "versions": "0+", + "about": "The producer id associated with the aborted transaction" + }, + { "name": "FirstOffset", "type": "int64", "versions": "0+", + "about": "The first offset in the aborted transaction" + }, + { "name": "LastOffset", "type": "int64", "versions": "0+", + "about": "The last offset in the aborted transaction" + }, + { "name": "LastStableOffset", "type": "int64", "versions": "0+", + "about": "The last stable offset at the time the transaction was aborted" + } + // Note: adding new fields may require TransactionIndex to be refactored to read version-per-record. + ] +} diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala index 0485f36b0ab74..926aa14ddefb4 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala @@ -30,7 +30,8 @@ import org.apache.kafka.coordinator.transaction.TransactionLogConfig import org.apache.kafka.server.common.{RequestLocal, TransactionVersion} import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics} import org.apache.kafka.server.util.MockTime -import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, CleanedTransactionMetadata, Cleaner, CleanerConfig, CleanerStats, LocalLog, LogAppendInfo, LogCleaner, LogCleanerManager, LogCleaningAbortedException, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, LogToClean, OffsetMap, ProducerStateManager, ProducerStateManagerConfig, UnifiedLog, VerificationGuard} +import org.apache.kafka.common.message.AbortedTxn +import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanedTransactionMetadata, Cleaner, CleanerConfig, CleanerStats, LocalLog, LogAppendInfo, LogCleaner, LogCleanerManager, LogCleaningAbortedException, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, LogToClean, OffsetMap, ProducerStateManager, ProducerStateManagerConfig, UnifiedLog, VerificationGuard} import org.apache.kafka.storage.internals.utils.Throttler import org.apache.kafka.storage.log.metrics.BrokerTopicStats import org.junit.jupiter.api.Assertions._ @@ -417,8 +418,8 @@ class LogCleanerTest extends Logging { assertEquals(20L, log.logEndOffset) val expectedAbortedTxns = util.List.of( - new AbortedTxn(producerId1, 8, 10, 11), - new AbortedTxn(producerId2, 11, 16, 17) + new AbortedTxn().setProducerId(producerId1).setFirstOffset(8).setLastOffset(10).setLastStableOffset(11), + new AbortedTxn().setProducerId(producerId2).setFirstOffset(11).setLastOffset(16).setLastStableOffset(17) ) assertAllTransactionsComplete(log) diff --git a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala index c479ff1c17499..e65ba72a7a446 100644 --- a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala @@ -30,7 +30,8 @@ import org.apache.kafka.metadata.MockConfigRepository import org.apache.kafka.server.common.TransactionVersion import org.apache.kafka.server.util.{MockTime, Scheduler} import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache -import org.apache.kafka.storage.internals.log.{AbortedTxn, CleanerConfig, EpochEntry, LocalLog, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, LogOffsetMetadata, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, OffsetIndex, ProducerStateManager, ProducerStateManagerConfig, SnapshotFile, UnifiedLog} +import org.apache.kafka.common.message.AbortedTxn +import org.apache.kafka.storage.internals.log.{CleanerConfig, EpochEntry, LocalLog, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, LogOffsetMetadata, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, OffsetIndex, ProducerStateManager, ProducerStateManagerConfig, SnapshotFile, UnifiedLog} import org.apache.kafka.storage.internals.checkpoint.CleanShutdownFileHandler import org.apache.kafka.storage.log.metrics.BrokerTopicStats import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, assertEquals, assertFalse, assertNotEquals, assertThrows, assertTrue} @@ -1281,7 +1282,7 @@ class LogLoaderTest { val reloadedLogConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 5) val reloadedLog = createLog(logDir, reloadedLogConfig, lastShutdownClean = false) val abortedTransactions = LogTestUtils.allAbortedTransactions(reloadedLog) - assertEquals(List(new AbortedTxn(pid1, 0L, 29L, 8L), new AbortedTxn(pid2, 8L, 74L, 36L)), abortedTransactions) + assertEquals(List(new AbortedTxn().setProducerId(pid1).setFirstOffset(0L).setLastOffset(29L).setLastStableOffset(8L), new AbortedTxn().setProducerId(pid2).setFirstOffset(8L).setLastOffset(74L).setLastStableOffset(36L)), abortedTransactions) } @Test @@ -1332,7 +1333,7 @@ class LogLoaderTest { val reloadedLogConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 5) val reloadedLog = createLog(logDir, reloadedLogConfig, recoveryPoint = recoveryPoint, lastShutdownClean = false) val abortedTransactions = LogTestUtils.allAbortedTransactions(reloadedLog) - assertEquals(List(new AbortedTxn(pid1, 0L, 29L, 8L), new AbortedTxn(pid2, 8L, 74L, 36L)), abortedTransactions) + assertEquals(List(new AbortedTxn().setProducerId(pid1).setFirstOffset(0L).setLastOffset(29L).setLastStableOffset(8L), new AbortedTxn().setProducerId(pid2).setFirstOffset(8L).setLastOffset(74L).setLastStableOffset(36L)), abortedTransactions) } @Test @@ -1386,7 +1387,7 @@ class LogLoaderTest { val reloadedLogConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 5) val reloadedLog = createLog(logDir, reloadedLogConfig, recoveryPoint = recoveryPoint, lastShutdownClean = false) val abortedTransactions = LogTestUtils.allAbortedTransactions(reloadedLog) - assertEquals(List(new AbortedTxn(pid1, 0L, 29L, 8L), new AbortedTxn(pid2, 8L, 74L, 36L)), abortedTransactions) + assertEquals(List(new AbortedTxn().setProducerId(pid1).setFirstOffset(0L).setLastOffset(29L).setLastStableOffset(8L), new AbortedTxn().setProducerId(pid2).setFirstOffset(8L).setLastOffset(74L).setLastStableOffset(36L)), abortedTransactions) } @Test diff --git a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala index 5b4fc1d41e4dc..9875a3b2ce441 100644 --- a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala +++ b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala @@ -37,7 +37,8 @@ import org.apache.kafka.server.log.remote.storage.RemoteLogManager import org.apache.kafka.server.storage.log.FetchIsolation import org.apache.kafka.server.util.Scheduler import org.apache.kafka.storage.internals.log.LogConfig.{DEFAULT_REMOTE_LOG_COPY_DISABLE_CONFIG, DEFAULT_REMOTE_LOG_DELETE_ON_DISABLE_CONFIG} -import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, FetchDataInfo, LazyIndex, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetsListener, LogSegment, ProducerStateManager, ProducerStateManagerConfig, TransactionIndex, VerificationGuard, UnifiedLog} +import org.apache.kafka.common.message.AbortedTxn +import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, LazyIndex, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetsListener, LogSegment, ProducerStateManager, ProducerStateManagerConfig, TransactionIndex, VerificationGuard, UnifiedLog} import org.apache.kafka.storage.log.metrics.BrokerTopicStats import scala.jdk.CollectionConverters._ diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala index 65a41b2579c6e..18c1f29002fa8 100755 --- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala +++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala @@ -36,7 +36,8 @@ import org.apache.kafka.server.metrics.KafkaYammerMetrics import org.apache.kafka.server.storage.log.FetchIsolation import org.apache.kafka.server.util.{MockTime, Scheduler} -import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, LogConfig, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, OffsetResultHolder, ProducerStateManagerConfig, UnifiedLog, VerificationGuard} +import org.apache.kafka.common.message.AbortedTxn +import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, OffsetResultHolder, ProducerStateManagerConfig, UnifiedLog, VerificationGuard} import org.apache.kafka.storage.log.metrics.BrokerTopicStats import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, _} import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} @@ -128,8 +129,8 @@ class UnifiedLogTest { val abortedTransactions = LogTestUtils.allAbortedTransactions(log) val expectedTransactions = List( - new AbortedTxn(pid1, 0L, 29L, 8L), - new AbortedTxn(pid2, 8L, 74L, 36L) + new AbortedTxn().setProducerId(pid1).setFirstOffset(0L).setLastOffset(29L).setLastStableOffset(8L), + new AbortedTxn().setProducerId(pid2).setFirstOffset(8L).setLastOffset(74L).setLastStableOffset(36L) ) assertEquals(expectedTransactions, abortedTransactions) diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java index 27713827fef67..cb2764f142bb5 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java @@ -25,6 +25,7 @@ import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.common.internals.Plugin; import org.apache.kafka.common.internals.SecurityManagerCompatibility; +import org.apache.kafka.common.message.AbortedTxn; import org.apache.kafka.common.message.FetchResponseData; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Quota; @@ -60,7 +61,6 @@ import org.apache.kafka.server.storage.log.FetchIsolation; import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile; import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache; -import org.apache.kafka.storage.internals.log.AbortedTxn; import org.apache.kafka.storage.internals.log.AsyncOffsetReadFutureHolder; import org.apache.kafka.storage.internals.log.AsyncOffsetReader; import org.apache.kafka.storage.internals.log.EpochEntry; @@ -1893,7 +1893,9 @@ private FetchDataInfo addAbortedTransactions(long startOffset, Consumer> accumulator = abortedTxns -> abortedTransactions.addAll(abortedTxns.stream() - .map(AbortedTxn::asAbortedTransaction).toList()); + .map(txn -> new FetchResponseData.AbortedTransaction() + .setProducerId(txn.producerId()) + .setFirstOffset(txn.firstOffset())).toList()); long startTimeNs = time.nanoseconds(); collectAbortedTransactions(startOffset, upperBoundOffset, segmentMetadata, accumulator, log); diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/AbortedTxn.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/AbortedTxn.java deleted file mode 100644 index 05217d97ba132..0000000000000 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/AbortedTxn.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.storage.internals.log; - -import org.apache.kafka.common.message.FetchResponseData; - -import java.nio.ByteBuffer; -import java.util.Objects; - -public class AbortedTxn { - static final int VERSION_OFFSET = 0; - static final int VERSION_SIZE = 2; - static final int PRODUCER_ID_OFFSET = VERSION_OFFSET + VERSION_SIZE; - static final int PRODUCER_ID_SIZE = 8; - static final int FIRST_OFFSET_OFFSET = PRODUCER_ID_OFFSET + PRODUCER_ID_SIZE; - static final int FIRST_OFFSET_SIZE = 8; - static final int LAST_OFFSET_OFFSET = FIRST_OFFSET_OFFSET + FIRST_OFFSET_SIZE; - static final int LAST_OFFSET_SIZE = 8; - static final int LAST_STABLE_OFFSET_OFFSET = LAST_OFFSET_OFFSET + LAST_OFFSET_SIZE; - static final int LAST_STABLE_OFFSET_SIZE = 8; - static final int TOTAL_SIZE = LAST_STABLE_OFFSET_OFFSET + LAST_STABLE_OFFSET_SIZE; - - public static final short CURRENT_VERSION = 0; - - final ByteBuffer buffer; - - AbortedTxn(ByteBuffer buffer) { - Objects.requireNonNull(buffer); - this.buffer = buffer; - } - - public AbortedTxn(CompletedTxn completedTxn, long lastStableOffset) { - this(completedTxn.producerId(), completedTxn.firstOffset(), completedTxn.lastOffset(), lastStableOffset); - } - - public AbortedTxn(long producerId, long firstOffset, long lastOffset, long lastStableOffset) { - this(toByteBuffer(producerId, firstOffset, lastOffset, lastStableOffset)); - } - - private static ByteBuffer toByteBuffer(long producerId, long firstOffset, long lastOffset, long lastStableOffset) { - ByteBuffer buffer = ByteBuffer.allocate(TOTAL_SIZE); - buffer.putShort(CURRENT_VERSION); - buffer.putLong(producerId); - buffer.putLong(firstOffset); - buffer.putLong(lastOffset); - buffer.putLong(lastStableOffset); - buffer.flip(); - return buffer; - } - - public short version() { - return buffer.get(VERSION_OFFSET); - } - - public long producerId() { - return buffer.getLong(PRODUCER_ID_OFFSET); - } - - public long firstOffset() { - return buffer.getLong(FIRST_OFFSET_OFFSET); - } - - public long lastOffset() { - return buffer.getLong(LAST_OFFSET_OFFSET); - } - - public long lastStableOffset() { - return buffer.getLong(LAST_STABLE_OFFSET_OFFSET); - } - - public FetchResponseData.AbortedTransaction asAbortedTransaction() { - return new FetchResponseData.AbortedTransaction() - .setProducerId(producerId()) - .setFirstOffset(firstOffset()); - } - - @Override - public boolean equals(Object o) { - if (this == o) - return true; - if (o == null || getClass() != o.getClass()) - return false; - - AbortedTxn that = (AbortedTxn) o; - return buffer.equals(that.buffer); - } - - @Override - public int hashCode() { - return buffer.hashCode(); - } - - @Override - public String toString() { - return "AbortedTxn(version=" + version() - + ", producerId=" + producerId() - + ", firstOffset=" + firstOffset() - + ", lastOffset=" + lastOffset() - + ", lastStableOffset=" + lastStableOffset() - + ")"; - } - -} diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/CleanedTransactionMetadata.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/CleanedTransactionMetadata.java index 6c4224aa2cdc9..e076f7d4f3db1 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/CleanedTransactionMetadata.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/CleanedTransactionMetadata.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.storage.internals.log; +import org.apache.kafka.common.message.AbortedTxn; import org.apache.kafka.common.record.internal.ControlRecordType; import org.apache.kafka.common.record.internal.Record; import org.apache.kafka.common.record.internal.RecordBatch; diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/Cleaner.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/Cleaner.java index 47d4d8fbb39b4..de7c8893abdf9 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/Cleaner.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/Cleaner.java @@ -19,6 +19,7 @@ import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.CorruptRecordException; +import org.apache.kafka.common.message.AbortedTxn; import org.apache.kafka.common.record.internal.FileRecords; import org.apache.kafka.common.record.internal.MemoryRecords; import org.apache.kafka.common.record.internal.MutableRecordBatch; diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LocalLog.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LocalLog.java index b8ecc886ab099..786673db1dda9 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LocalLog.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LocalLog.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.KafkaStorageException; import org.apache.kafka.common.errors.OffsetOutOfRangeException; +import org.apache.kafka.common.message.AbortedTxn; import org.apache.kafka.common.message.FetchResponseData; import org.apache.kafka.common.record.internal.FileLogInputStream; import org.apache.kafka.common.record.internal.FileRecords; @@ -540,7 +541,9 @@ FetchDataInfo addAbortedTransactions(long startOffset, LogSegment segment, Fetch List abortedTransactions = new ArrayList<>(); Consumer> accumulator = abortedTxns -> { for (AbortedTxn abortedTxn : abortedTxns) - abortedTransactions.add(abortedTxn.asAbortedTransaction()); + abortedTransactions.add(new FetchResponseData.AbortedTransaction() + .setProducerId(abortedTxn.producerId()) + .setFirstOffset(abortedTxn.firstOffset())); }; collectAbortedTransactions(startOffset, upperBoundOffset, segment, accumulator); return new FetchDataInfo(fetchInfo.fetchOffsetMetadata, diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java index 4d9769e3e103f..d461a705de56e 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java @@ -18,6 +18,7 @@ import org.apache.kafka.common.InvalidRecordException; import org.apache.kafka.common.errors.CorruptRecordException; +import org.apache.kafka.common.message.AbortedTxn; import org.apache.kafka.common.record.internal.FileLogInputStream.FileChannelRecordBatch; import org.apache.kafka.common.record.internal.FileRecords; import org.apache.kafka.common.record.internal.FileRecords.LogOffsetPosition; @@ -348,7 +349,11 @@ public int appendFromFile(FileRecords records, int start) throws IOException { public void updateTxnIndex(CompletedTxn completedTxn, long lastStableOffset) throws IOException { if (completedTxn.isAborted()) { LOGGER.trace("Writing aborted transaction {} to transaction index, last stable offset is {}", completedTxn, lastStableOffset); - txnIndex.append(new AbortedTxn(completedTxn, lastStableOffset)); + txnIndex.append(new AbortedTxn() + .setProducerId(completedTxn.producerId()) + .setFirstOffset(completedTxn.firstOffset()) + .setLastOffset(completedTxn.lastOffset()) + .setLastStableOffset(lastStableOffset)); } } diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java index 6283012c165fd..f2eb074926f24 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java @@ -17,7 +17,9 @@ package org.apache.kafka.storage.internals.log; import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.utils.PrimitiveRef; +import org.apache.kafka.common.message.AbortedTxn; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.MessageUtil; import org.apache.kafka.common.utils.Utils; import java.io.Closeable; @@ -32,7 +34,6 @@ import java.util.List; import java.util.Optional; import java.util.OptionalLong; -import java.util.function.Supplier; /** * The transaction index maintains metadata about the aborted transactions for each segment. This includes @@ -47,6 +48,11 @@ */ public class TransactionIndex implements Closeable { + // Note: if new fields are added to AbortedTxn, this code may need to be changed to read the + // version bytes first for each record and then determine the record body size based on the version. + private static final int ABORTED_TXN_RECORD_SIZE = + MessageUtil.toVersionPrefixedByteBuffer(AbortedTxn.HIGHEST_SUPPORTED_VERSION, new AbortedTxn()).remaining(); + private record AbortedTxnWithPosition(AbortedTxn txn, int position) { } @@ -82,7 +88,8 @@ public void append(AbortedTxn abortedTxn) throws IOException { + file.getAbsolutePath()); }); lastOffset = OptionalLong.of(abortedTxn.lastOffset()); - Utils.writeFully(channel(), abortedTxn.buffer.duplicate()); + ByteBuffer buffer = MessageUtil.toVersionPrefixedByteBuffer(AbortedTxn.HIGHEST_SUPPORTED_VERSION, abortedTxn); + Utils.writeFully(channel(), buffer); } public void flush() throws IOException { @@ -130,13 +137,11 @@ public void renameTo(File f) throws IOException { } public void truncateTo(long offset) throws IOException { - ByteBuffer buffer = ByteBuffer.allocate(AbortedTxn.TOTAL_SIZE); OptionalLong newLastOffset = OptionalLong.empty(); - for (AbortedTxnWithPosition txnWithPosition : iterable(() -> buffer)) { + for (AbortedTxnWithPosition txnWithPosition : iterable()) { AbortedTxn abortedTxn = txnWithPosition.txn; - long position = txnWithPosition.position; if (abortedTxn.lastOffset() >= offset) { - channel().truncate(position); + channel().truncate(txnWithPosition.position); lastOffset = newLastOffset; return; } @@ -178,8 +183,7 @@ public TxnIndexSearchResult collectAbortedTxns(long fetchOffset, long upperBound * @throws CorruptIndexException if any problems are found. */ public void sanityCheck() { - ByteBuffer buffer = ByteBuffer.allocate(AbortedTxn.TOTAL_SIZE); - for (AbortedTxnWithPosition txnWithPosition : iterable(() -> buffer)) { + for (AbortedTxnWithPosition txnWithPosition : iterable()) { AbortedTxn abortedTxn = txnWithPosition.txn; if (abortedTxn.lastOffset() < startOffset) throw new CorruptIndexException("Last offset of aborted transaction " + abortedTxn + " in index " @@ -216,22 +220,18 @@ private FileChannel channelOrNull() { } private Iterable iterable() { - return iterable(() -> ByteBuffer.allocate(AbortedTxn.TOTAL_SIZE)); - } - - private Iterable iterable(Supplier allocate) { FileChannel channel = channelOrNull(); if (channel == null) return List.of(); - PrimitiveRef.IntRef position = PrimitiveRef.ofInt(0); - return () -> new Iterator<>() { + private final ByteBuffer buffer = ByteBuffer.allocate(ABORTED_TXN_RECORD_SIZE); + private int position = 0; @Override public boolean hasNext() { try { - return channel.position() - position.value >= AbortedTxn.TOTAL_SIZE; + return channel.position() - position >= ABORTED_TXN_RECORD_SIZE; } catch (IOException e) { throw new KafkaException("Failed read position from the transaction index " + file.getAbsolutePath(), e); } @@ -240,17 +240,18 @@ public boolean hasNext() { @Override public AbortedTxnWithPosition next() { try { - ByteBuffer buffer = allocate.get(); - Utils.readFully(channel, buffer, position.value); + buffer.clear(); + Utils.readFully(channel, buffer, position); buffer.flip(); - AbortedTxn abortedTxn = new AbortedTxn(buffer); - if (abortedTxn.version() > AbortedTxn.CURRENT_VERSION) - throw new KafkaException("Unexpected aborted transaction version " + abortedTxn.version() - + " in transaction index " + file.getAbsolutePath() + ", current version is " - + AbortedTxn.CURRENT_VERSION); - AbortedTxnWithPosition nextEntry = new AbortedTxnWithPosition(abortedTxn, position.value); - position.value += AbortedTxn.TOTAL_SIZE; + short version = buffer.getShort(); + if (version < AbortedTxn.LOWEST_SUPPORTED_VERSION || version > AbortedTxn.HIGHEST_SUPPORTED_VERSION) + throw new KafkaException("Unexpected aborted transaction version " + version + + " in transaction index " + file.getAbsolutePath() + ", supported version range is " + + AbortedTxn.LOWEST_SUPPORTED_VERSION + " to " + AbortedTxn.HIGHEST_SUPPORTED_VERSION); + AbortedTxn abortedTxn = new AbortedTxn(new ByteBufferAccessor(buffer), version); + AbortedTxnWithPosition nextEntry = new AbortedTxnWithPosition(abortedTxn, position); + position += ABORTED_TXN_RECORD_SIZE; return nextEntry; } catch (IOException e) { // We received an unexpected error reading from the index file. We propagate this as an diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/TxnIndexSearchResult.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/TxnIndexSearchResult.java index 6f9a425ea1ab9..ef1b14fff6fc4 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/TxnIndexSearchResult.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/TxnIndexSearchResult.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.storage.internals.log; +import org.apache.kafka.common.message.AbortedTxn; + import java.util.Collections; import java.util.List; diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java index e523a3218c383..65f5042c41aeb 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java @@ -30,6 +30,7 @@ import org.apache.kafka.common.errors.RecordBatchTooLargeException; import org.apache.kafka.common.errors.RecordTooLargeException; import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.message.AbortedTxn; import org.apache.kafka.common.message.DescribeProducersResponseData; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.record.internal.CompressionType; diff --git a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java index 166032da1f573..9af969fe82de5 100644 --- a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java +++ b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java @@ -19,6 +19,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.compress.Compression; import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.message.AbortedTxn; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.record.internal.ControlRecordType; import org.apache.kafka.common.record.internal.EndTransactionMarker; diff --git a/storage/src/test/java/org/apache/kafka/storage/internals/log/RemoteIndexCacheTest.java b/storage/src/test/java/org/apache/kafka/storage/internals/log/RemoteIndexCacheTest.java index d8d3a7f58226c..dc9c07839c96b 100644 --- a/storage/src/test/java/org/apache/kafka/storage/internals/log/RemoteIndexCacheTest.java +++ b/storage/src/test/java/org/apache/kafka/storage/internals/log/RemoteIndexCacheTest.java @@ -19,6 +19,7 @@ import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.AbortedTxn; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; @@ -1189,10 +1190,10 @@ private void createCorruptTxnIndexForSegmentMetadata(File dir, RemoteLogSegmentM txnIdxFile.createNewFile(); TransactionIndex txnIndex = new TransactionIndex(metadata.startOffset(), txnIdxFile); List abortedTxns = List.of( - new AbortedTxn(0L, 0, 10, 11), - new AbortedTxn(1L, 5, 15, 13), - new AbortedTxn(2L, 18, 35, 25), - new AbortedTxn(3L, 32, 50, 40)); + new AbortedTxn().setProducerId(0L).setFirstOffset(0).setLastOffset(10).setLastStableOffset(11), + new AbortedTxn().setProducerId(1L).setFirstOffset(5).setLastOffset(15).setLastStableOffset(13), + new AbortedTxn().setProducerId(2L).setFirstOffset(18).setLastOffset(35).setLastStableOffset(25), + new AbortedTxn().setProducerId(3L).setFirstOffset(32).setLastOffset(50).setLastStableOffset(40)); for (AbortedTxn abortedTxn : abortedTxns) { txnIndex.append(abortedTxn); } diff --git a/storage/src/test/java/org/apache/kafka/storage/internals/log/TransactionIndexTest.java b/storage/src/test/java/org/apache/kafka/storage/internals/log/TransactionIndexTest.java index c25c880cdf054..803122ac4568e 100644 --- a/storage/src/test/java/org/apache/kafka/storage/internals/log/TransactionIndexTest.java +++ b/storage/src/test/java/org/apache/kafka/storage/internals/log/TransactionIndexTest.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.storage.internals.log; +import org.apache.kafka.common.message.AbortedTxn; +import org.apache.kafka.common.protocol.MessageUtil; import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.AfterEach; @@ -23,7 +25,10 @@ import java.io.File; import java.io.IOException; +import java.lang.reflect.Method; +import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; @@ -45,15 +50,15 @@ public void teardown() throws IOException { @Test public void testPositionSetCorrectlyWhenOpened() throws IOException { List abortedTxns = new ArrayList<>(List.of( - new AbortedTxn(0L, 0, 10, 11), - new AbortedTxn(1L, 5, 15, 13), - new AbortedTxn(2L, 18, 35, 25), - new AbortedTxn(3L, 32, 50, 40))); + new AbortedTxn().setProducerId(0L).setFirstOffset(0).setLastOffset(10).setLastStableOffset(11), + new AbortedTxn().setProducerId(1L).setFirstOffset(5).setLastOffset(15).setLastStableOffset(13), + new AbortedTxn().setProducerId(2L).setFirstOffset(18).setLastOffset(35).setLastStableOffset(25), + new AbortedTxn().setProducerId(3L).setFirstOffset(32).setLastOffset(50).setLastStableOffset(40))); abortedTxns.forEach(txn -> assertDoesNotThrow(() -> index.append(txn))); index.close(); TransactionIndex reopenedIndex = new TransactionIndex(0L, file); - AbortedTxn anotherAbortedTxn = new AbortedTxn(3L, 50, 60, 55); + AbortedTxn anotherAbortedTxn = new AbortedTxn().setProducerId(3L).setFirstOffset(50).setLastOffset(60).setLastStableOffset(55); reopenedIndex.append(anotherAbortedTxn); abortedTxns.add(anotherAbortedTxn); assertEquals(abortedTxns, reopenedIndex.allAbortedTxns()); @@ -62,10 +67,10 @@ public void testPositionSetCorrectlyWhenOpened() throws IOException { @Test public void testSanityCheck() throws IOException { List abortedTxns = List.of( - new AbortedTxn(0L, 0, 10, 11), - new AbortedTxn(1L, 5, 15, 13), - new AbortedTxn(2L, 18, 35, 25), - new AbortedTxn(3L, 32, 50, 40)); + new AbortedTxn().setProducerId(0L).setFirstOffset(0).setLastOffset(10).setLastStableOffset(11), + new AbortedTxn().setProducerId(1L).setFirstOffset(5).setLastOffset(15).setLastStableOffset(13), + new AbortedTxn().setProducerId(2L).setFirstOffset(18).setLastOffset(35).setLastStableOffset(25), + new AbortedTxn().setProducerId(3L).setFirstOffset(32).setLastOffset(50).setLastStableOffset(40)); abortedTxns.forEach(txn -> assertDoesNotThrow(() -> index.append(txn))); index.close(); @@ -77,25 +82,25 @@ public void testSanityCheck() throws IOException { @Test public void testLastOffsetMustIncrease() throws IOException { - index.append(new AbortedTxn(1L, 5, 15, 13)); - assertThrows(IllegalArgumentException.class, () -> index.append(new AbortedTxn(0L, 0, - 15, 11))); + index.append(new AbortedTxn().setProducerId(1L).setFirstOffset(5).setLastOffset(15).setLastStableOffset(13)); + assertThrows(IllegalArgumentException.class, () -> index.append(new AbortedTxn().setProducerId(0L).setFirstOffset(0) + .setLastOffset(15).setLastStableOffset(11))); } @Test public void testLastOffsetCannotDecrease() throws IOException { - index.append(new AbortedTxn(1L, 5, 15, 13)); - assertThrows(IllegalArgumentException.class, () -> index.append(new AbortedTxn(0L, 0, - 10, 11))); + index.append(new AbortedTxn().setProducerId(1L).setFirstOffset(5).setLastOffset(15).setLastStableOffset(13)); + assertThrows(IllegalArgumentException.class, () -> index.append(new AbortedTxn().setProducerId(0L).setFirstOffset(0) + .setLastOffset(10).setLastStableOffset(11))); } @Test public void testCollectAbortedTransactions() { List abortedTransactions = List.of( - new AbortedTxn(0L, 0, 10, 11), - new AbortedTxn(1L, 5, 15, 13), - new AbortedTxn(2L, 18, 35, 25), - new AbortedTxn(3L, 32, 50, 40)); + new AbortedTxn().setProducerId(0L).setFirstOffset(0).setLastOffset(10).setLastStableOffset(11), + new AbortedTxn().setProducerId(1L).setFirstOffset(5).setLastOffset(15).setLastStableOffset(13), + new AbortedTxn().setProducerId(2L).setFirstOffset(18).setLastOffset(35).setLastStableOffset(25), + new AbortedTxn().setProducerId(3L).setFirstOffset(32).setLastOffset(50).setLastStableOffset(40)); abortedTransactions.forEach(txn -> assertDoesNotThrow(() -> index.append(txn))); @@ -127,10 +132,10 @@ public void testCollectAbortedTransactions() { @Test public void testTruncate() throws IOException { List abortedTransactions = List.of( - new AbortedTxn(0L, 0, 10, 2), - new AbortedTxn(1L, 5, 15, 16), - new AbortedTxn(2L, 18, 35, 25), - new AbortedTxn(3L, 32, 50, 40)); + new AbortedTxn().setProducerId(0L).setFirstOffset(0).setLastOffset(10).setLastStableOffset(2), + new AbortedTxn().setProducerId(1L).setFirstOffset(5).setLastOffset(15).setLastStableOffset(16), + new AbortedTxn().setProducerId(2L).setFirstOffset(18).setLastOffset(35).setLastStableOffset(25), + new AbortedTxn().setProducerId(3L).setFirstOffset(32).setLastOffset(50).setLastStableOffset(40)); abortedTransactions.forEach(txn -> assertDoesNotThrow(() -> index.append(txn))); @@ -151,8 +156,7 @@ public void testAbortedTxnSerde() { long lastOffset = 299L; long lastStableOffset = 200L; - AbortedTxn abortedTxn = new AbortedTxn(pid, firstOffset, lastOffset, lastStableOffset); - assertEquals(AbortedTxn.CURRENT_VERSION, abortedTxn.version()); + AbortedTxn abortedTxn = new AbortedTxn().setProducerId(pid).setFirstOffset(firstOffset).setLastOffset(lastOffset).setLastStableOffset(lastStableOffset); assertEquals(pid, abortedTxn.producerId()); assertEquals(firstOffset, abortedTxn.firstOffset()); assertEquals(lastOffset, abortedTxn.lastOffset()); @@ -162,10 +166,10 @@ public void testAbortedTxnSerde() { @Test public void testRenameIndex() throws IOException { File renamed = TestUtils.tempFile(); - index.append(new AbortedTxn(0L, 0, 10, 2)); + index.append(new AbortedTxn().setProducerId(0L).setFirstOffset(0).setLastOffset(10).setLastStableOffset(2)); index.renameTo(renamed); - index.append(new AbortedTxn(1L, 5, 15, 16)); + index.append(new AbortedTxn().setProducerId(1L).setFirstOffset(5).setLastOffset(15).setLastStableOffset(16)); List abortedTxns = index.collectAbortedTxns(0L, 100L).abortedTransactions(); assertEquals(2, abortedTxns.size()); @@ -188,7 +192,7 @@ public void testFlush() throws IOException { assertTrue(nonExistentFile.delete()); try (TransactionIndex testIndex = new TransactionIndex(0, nonExistentFile)) { testIndex.flush(); - testIndex.append(new AbortedTxn(0L, 0, 10, 2)); + testIndex.append(new AbortedTxn().setProducerId(0L).setFirstOffset(0).setLastOffset(10).setLastStableOffset(2)); testIndex.flush(); assertNotEquals(0, testIndex.file().length()); } @@ -217,7 +221,67 @@ public void testIsEmptyWhenFileIsEmpty() { @Test public void testIsEmptyWhenFileIsNotEmpty() throws IOException { - index.append(new AbortedTxn(0L, 0, 10, 2)); + index.append(new AbortedTxn().setProducerId(0L).setFirstOffset(0).setLastOffset(10).setLastStableOffset(2)); assertFalse(index.isEmpty()); } + + @Test + @SuppressWarnings("unchecked") + public void testIterableReturnsIndependentIterators() throws Exception { + List abortedTxns = List.of( + new AbortedTxn().setProducerId(0L).setFirstOffset(0).setLastOffset(10).setLastStableOffset(2), + new AbortedTxn().setProducerId(1L).setFirstOffset(11).setLastOffset(20).setLastStableOffset(15), + new AbortedTxn().setProducerId(2L).setFirstOffset(21).setLastOffset(30).setLastStableOffset(25)); + abortedTxns.forEach(txn -> assertDoesNotThrow(() -> index.append(txn))); + + Method iterableMethod = TransactionIndex.class.getDeclaredMethod("iterable"); + iterableMethod.setAccessible(true); + Iterable iterable = (Iterable) iterableMethod.invoke(index); + + Iterator iter1 = iterable.iterator(); + Iterator iter2 = iterable.iterator(); + + // Exhaust iter1 + int count1 = 0; + while (iter1.hasNext()) { + iter1.next(); + count1++; + } + assertEquals(3, count1); + + // iter2 must be independent — still readable from the beginning + int count2 = 0; + while (iter2.hasNext()) { + iter2.next(); + count2++; + } + assertEquals(3, count2); + } + + @Test + public void testBinaryCompatibilityWithHandWrittenClass() { + long producerId = 983493L; + long firstOffset = 137L; + long lastOffset = 299L; + long lastStableOffset = 200L; + + // Build the expected binary using the same layout as the old hand-written AbortedTxn + ByteBuffer expected = ByteBuffer.allocate(34); + expected.putShort((short) 0); // version + expected.putLong(producerId); + expected.putLong(firstOffset); + expected.putLong(lastOffset); + expected.putLong(lastStableOffset); + expected.flip(); + + // Serialize using the generated class + AbortedTxn abortedTxn = new AbortedTxn() + .setProducerId(producerId) + .setFirstOffset(firstOffset) + .setLastOffset(lastOffset) + .setLastStableOffset(lastStableOffset); + ByteBuffer actual = MessageUtil.toVersionPrefixedByteBuffer(AbortedTxn.HIGHEST_SUPPORTED_VERSION, abortedTxn); + + assertEquals(expected, actual); + } } diff --git a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java index a9aa710c701a4..f382fe803a336 100644 --- a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java +++ b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java @@ -30,6 +30,7 @@ import org.apache.kafka.common.errors.OutOfOrderSequenceException; import org.apache.kafka.common.errors.RecordBatchTooLargeException; import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.message.AbortedTxn; import org.apache.kafka.common.message.DescribeProducersResponseData; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.record.TimestampType; @@ -4147,8 +4148,8 @@ public void testTransactionIndexUpdated() throws IOException { abortedTransactions.addAll(segment.txnIndex().allAbortedTxns()); } List expectedTransactions = List.of( - new AbortedTxn(pid1, 0L, 29L, 8L), - new AbortedTxn(pid2, 8L, 74L, 36L) + new AbortedTxn().setProducerId(pid1).setFirstOffset(0L).setLastOffset(29L).setLastStableOffset(8L), + new AbortedTxn().setProducerId(pid2).setFirstOffset(8L).setLastOffset(74L).setLastStableOffset(36L) ); assertEquals(expectedTransactions, abortedTransactions); diff --git a/tools/src/main/java/org/apache/kafka/tools/DumpLogSegments.java b/tools/src/main/java/org/apache/kafka/tools/DumpLogSegments.java index e0e042ecae826..df4077060f6a1 100644 --- a/tools/src/main/java/org/apache/kafka/tools/DumpLogSegments.java +++ b/tools/src/main/java/org/apache/kafka/tools/DumpLogSegments.java @@ -18,6 +18,7 @@ import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.message.AbortedTxn; import org.apache.kafka.common.message.ConsumerProtocolAssignment; import org.apache.kafka.common.message.ConsumerProtocolAssignmentJsonConverter; import org.apache.kafka.common.message.ConsumerProtocolSubscription; @@ -60,7 +61,6 @@ import org.apache.kafka.server.util.CommandLineUtils; import org.apache.kafka.snapshot.SnapshotPath; import org.apache.kafka.snapshot.Snapshots; -import org.apache.kafka.storage.internals.log.AbortedTxn; import org.apache.kafka.storage.internals.log.BatchMetadata; import org.apache.kafka.storage.internals.log.CorruptSnapshotException; import org.apache.kafka.storage.internals.log.LogFileUtils; @@ -156,7 +156,7 @@ public static void main(String[] args) throws IOException { private static void dumpTxnIndex(File file) throws IOException { try (TransactionIndex index = new TransactionIndex(UnifiedLog.offsetFromFile(file), file)) { for (AbortedTxn abortedTxn : index.allAbortedTxns()) { - System.out.println("version: " + abortedTxn.version() + + System.out.println("version: " + AbortedTxn.HIGHEST_SUPPORTED_VERSION + " producerId: " + abortedTxn.producerId() + " firstOffset: " + abortedTxn.firstOffset() + " lastOffset: " + abortedTxn.lastOffset() + diff --git a/tools/src/test/java/org/apache/kafka/tools/DumpLogSegmentsTest.java b/tools/src/test/java/org/apache/kafka/tools/DumpLogSegmentsTest.java index 5b403d523528d..9221f02df5a95 100644 --- a/tools/src/test/java/org/apache/kafka/tools/DumpLogSegmentsTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/DumpLogSegmentsTest.java @@ -28,6 +28,7 @@ import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.TopicConfig; import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.message.AbortedTxn; import org.apache.kafka.common.message.KRaftVersionRecord; import org.apache.kafka.common.message.LeaderChangeMessage; import org.apache.kafka.common.message.SnapshotFooterRecord; @@ -85,7 +86,6 @@ import org.apache.kafka.server.storage.log.FetchIsolation; import org.apache.kafka.server.util.MockTime; import org.apache.kafka.snapshot.RecordsSnapshotWriter; -import org.apache.kafka.storage.internals.log.AbortedTxn; import org.apache.kafka.storage.internals.log.AppendOrigin; import org.apache.kafka.storage.internals.log.FetchDataInfo; import org.apache.kafka.storage.internals.log.LogConfig; @@ -1486,8 +1486,8 @@ public void testLegacyRecordBatchOutputFormat() throws Exception { public void testDumpTxnIndex() throws Exception { File txnIndexFile = new File(logDir, segmentName + ".txnindex"); try (TransactionIndex index = new TransactionIndex(0L, txnIndexFile)) { - index.append(new AbortedTxn(1L, 0, 10, 11)); - index.append(new AbortedTxn(2L, 15, 25, 26)); + index.append(new AbortedTxn().setProducerId(1L).setFirstOffset(0).setLastOffset(10).setLastStableOffset(11)); + index.append(new AbortedTxn().setProducerId(2L).setFirstOffset(15).setLastOffset(25).setLastStableOffset(26)); index.flush(); }