From 3fd84ef21caf32574a024c11ab238bc8f8be5921 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Sun, 11 Dec 2022 20:09:54 -0800 Subject: [PATCH] KAFKA-14474: Move OffsetIndex to storage module For broader context on this change, please check: KAFKA-14470: Move log layer to storage module --- core/src/main/scala/kafka/log/LazyIndex.scala | 2 +- core/src/main/scala/kafka/log/LocalLog.scala | 2 +- .../src/main/scala/kafka/log/LogSegment.scala | 5 +- .../main/scala/kafka/log/OffsetIndex.scala | 207 --------------- .../kafka/log/remote/RemoteIndexCache.scala | 2 +- .../scala/kafka/tools/DumpLogSegments.scala | 6 +- .../scala/unit/kafka/log/LogLoaderTest.scala | 2 +- .../unit/kafka/log/OffsetIndexTest.scala | 34 +-- .../log/remote/RemoteIndexCacheTest.scala | 6 +- .../log/remote/RemoteLogManagerTest.scala | 7 +- .../server/log/internals/OffsetIndex.java | 246 ++++++++++++++++++ 11 files changed, 280 insertions(+), 239 deletions(-) delete mode 100755 core/src/main/scala/kafka/log/OffsetIndex.scala create mode 100644 storage/src/main/java/org/apache/kafka/server/log/internals/OffsetIndex.java diff --git a/core/src/main/scala/kafka/log/LazyIndex.scala b/core/src/main/scala/kafka/log/LazyIndex.scala index d267883541203..6725d034a1bee 100644 --- a/core/src/main/scala/kafka/log/LazyIndex.scala +++ b/core/src/main/scala/kafka/log/LazyIndex.scala @@ -24,7 +24,7 @@ import LazyIndex._ import kafka.utils.CoreUtils.inLock import kafka.utils.threadsafe import org.apache.kafka.common.utils.Utils -import org.apache.kafka.server.log.internals.AbstractIndex +import org.apache.kafka.server.log.internals.{AbstractIndex, OffsetIndex} /** * A wrapper over an `AbstractIndex` instance that provides a mechanism to defer loading diff --git a/core/src/main/scala/kafka/log/LocalLog.scala b/core/src/main/scala/kafka/log/LocalLog.scala index 68bb9d9f8b050..7d2524091a671 100644 --- a/core/src/main/scala/kafka/log/LocalLog.scala +++ b/core/src/main/scala/kafka/log/LocalLog.scala @@ -443,7 +443,7 @@ class LocalLog(@volatile private var _dir: File, val fetchSize = fetchInfo.records.sizeInBytes val startOffsetPosition = new OffsetPosition(fetchInfo.fetchOffsetMetadata.messageOffset, fetchInfo.fetchOffsetMetadata.relativePositionInSegment) - val upperBoundOffset = segment.fetchUpperBoundOffset(startOffsetPosition, fetchSize).getOrElse { + val upperBoundOffset = segment.fetchUpperBoundOffset(startOffsetPosition, fetchSize).orElse { segments.higherSegment(segment.baseOffset).map(_.baseOffset).getOrElse(logEndOffset) } diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index 0c2a013e11f90..fbe50f8225756 100755 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -32,8 +32,9 @@ import org.apache.kafka.common.errors.CorruptRecordException import org.apache.kafka.common.record.FileRecords.{LogOffsetPosition, TimestampAndOffset} import org.apache.kafka.common.record._ import org.apache.kafka.common.utils.{BufferSupplier, Time} -import org.apache.kafka.server.log.internals.{AbortedTxn, CompletedTxn, OffsetPosition, TimestampOffset, TransactionIndex, TxnIndexSearchResult} +import org.apache.kafka.server.log.internals.{AbortedTxn, CompletedTxn, OffsetIndex, OffsetPosition, TimestampOffset, TransactionIndex, TxnIndexSearchResult} +import java.util.Optional import scala.jdk.CollectionConverters._ import scala.math._ @@ -321,7 +322,7 @@ class LogSegment private[log] (val log: FileRecords, firstEntryIncomplete = adjustedMaxSize < startOffsetAndSize.size) } - def fetchUpperBoundOffset(startOffsetPosition: OffsetPosition, fetchSize: Int): Option[Long] = + def fetchUpperBoundOffset(startOffsetPosition: OffsetPosition, fetchSize: Int): Optional[Long] = offsetIndex.fetchUpperBoundOffset(startOffsetPosition, fetchSize).map(_.offset) /** diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala deleted file mode 100755 index 9039f1d4e541d..0000000000000 --- a/core/src/main/scala/kafka/log/OffsetIndex.scala +++ /dev/null @@ -1,207 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.log - -import java.io.File -import java.nio.ByteBuffer -import kafka.utils.CoreUtils.inLock -import kafka.utils.Logging -import org.apache.kafka.common.errors.InvalidOffsetException -import org.apache.kafka.server.log.internals.{AbstractIndex, CorruptIndexException, IndexSearchType, OffsetPosition} - -/** - * An index that maps offsets to physical file locations for a particular log segment. This index may be sparse: - * that is it may not hold an entry for all messages in the log. - * - * The index is stored in a file that is pre-allocated to hold a fixed maximum number of 8-byte entries. - * - * The index supports lookups against a memory-map of this file. These lookups are done using a simple binary search variant - * to locate the offset/location pair for the greatest offset less than or equal to the target offset. - * - * Index files can be opened in two ways: either as an empty, mutable index that allows appends or - * an immutable read-only index file that has previously been populated. The makeReadOnly method will turn a mutable file into an - * immutable one and truncate off any extra bytes. This is done when the index file is rolled over. - * - * No attempt is made to checksum the contents of this file, in the event of a crash it is rebuilt. - * - * The file format is a series of entries. The physical format is a 4 byte "relative" offset and a 4 byte file location for the - * message with that offset. The offset stored is relative to the base offset of the index file. So, for example, - * if the base offset was 50, then the offset 55 would be stored as 5. Using relative offsets in this way let's us use - * only 4 bytes for the offset. - * - * The frequency of entries is up to the user of this class. - * - * All external APIs translate from relative offsets to full offsets, so users of this class do not interact with the internal - * storage format. - */ -// Avoid shadowing mutable `file` in AbstractIndex -class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable: Boolean = true) - extends AbstractIndex(_file, baseOffset, maxIndexSize, writable) { - import OffsetIndex._ - - override def entrySize = 8 - - /* the last offset in the index */ - private[this] var _lastOffset = lastEntry.offset - - debug(s"Loaded index file ${file.getAbsolutePath} with maxEntries = $maxEntries, " + - s"maxIndexSize = $maxIndexSize, entries = $entries, lastOffset = ${_lastOffset}, file position = ${mmap.position()}") - - /** - * The last entry in the index - */ - private def lastEntry: OffsetPosition = { - inLock(lock) { - entries match { - case 0 => new OffsetPosition(baseOffset, 0) - case s => parseEntry(mmap, s - 1) - } - } - } - - def lastOffset: Long = _lastOffset - - /** - * Find the largest offset less than or equal to the given targetOffset - * and return a pair holding this offset and its corresponding physical file position. - * - * @param targetOffset The offset to look up. - * @return The offset found and the corresponding file position for this offset - * If the target offset is smaller than the least entry in the index (or the index is empty), - * the pair (baseOffset, 0) is returned. - */ - def lookup(targetOffset: Long): OffsetPosition = { - maybeLock(lock, {() => - val idx = mmap.duplicate - val slot = largestLowerBoundSlotFor(idx, targetOffset, IndexSearchType.KEY) - if(slot == -1) - new OffsetPosition(baseOffset, 0) - else - parseEntry(idx, slot) - }) - } - - /** - * Find an upper bound offset for the given fetch starting position and size. This is an offset which - * is guaranteed to be outside the fetched range, but note that it will not generally be the smallest - * such offset. - */ - def fetchUpperBoundOffset(fetchOffset: OffsetPosition, fetchSize: Int): Option[OffsetPosition] = { - maybeLock(lock, { () => - val idx = mmap.duplicate - val slot = smallestUpperBoundSlotFor(idx, fetchOffset.position + fetchSize, IndexSearchType.VALUE) - if (slot == -1) - None - else - Some(parseEntry(idx, slot)) - }) - } - - private def relativeOffset(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * entrySize) - - private def physical(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * entrySize + 4) - - override protected def parseEntry(buffer: ByteBuffer, n: Int): OffsetPosition = { - new OffsetPosition(baseOffset + relativeOffset(buffer, n), physical(buffer, n)) - } - - /** - * Get the nth offset mapping from the index - * @param n The entry number in the index - * @return The offset/position pair at that entry - */ - def entry(n: Int): OffsetPosition = { - maybeLock(lock, { () => - if (n >= entries) - throw new IllegalArgumentException(s"Attempt to fetch the ${n}th entry from index ${file.getAbsolutePath}, " + - s"which has size $entries.") - parseEntry(mmap, n) - }) - } - - /** - * Append an entry for the given offset/location pair to the index. This entry must have a larger offset than all subsequent entries. - * @throws kafka.common.IndexOffsetOverflowException if the offset causes index offset to overflow - * @throws InvalidOffsetException if provided offset is not larger than the last offset - */ - def append(offset: Long, position: Int): Unit = { - inLock(lock) { - require(!isFull, "Attempt to append to a full index (size = " + entries + ").") - if (entries == 0 || offset > _lastOffset) { - trace(s"Adding index entry $offset => $position to ${file.getAbsolutePath}") - mmap.putInt(relativeOffset(offset)) - mmap.putInt(position) - incrementEntries() - _lastOffset = offset - require(entries * entrySize == mmap.position(), s"$entries entries but file position in index is ${mmap.position()}.") - } else { - throw new InvalidOffsetException(s"Attempt to append an offset ($offset) to position $entries no larger than" + - s" the last offset appended (${_lastOffset}) to ${file.getAbsolutePath}.") - } - } - } - - override def truncate(): Unit = truncateToEntries(0) - - override def truncateTo(offset: Long): Unit = { - inLock(lock) { - val idx = mmap.duplicate - val slot = largestLowerBoundSlotFor(idx, offset, IndexSearchType.KEY) - - /* There are 3 cases for choosing the new size - * 1) if there is no entry in the index <= the offset, delete everything - * 2) if there is an entry for this exact offset, delete it and everything larger than it - * 3) if there is no entry for this offset, delete everything larger than the next smallest - */ - val newEntries = - if(slot < 0) - 0 - else if(relativeOffset(idx, slot) == offset - baseOffset) - slot - else - slot + 1 - truncateToEntries(newEntries) - } - } - - /** - * Truncates index to a known number of entries. - */ - private def truncateToEntries(entries: Int): Unit = { - inLock(lock) { - super.truncateToEntries0(entries) - _lastOffset = lastEntry.offset - debug(s"Truncated index ${file.getAbsolutePath} to $entries entries;" + - s" position is now ${mmap.position()} and last offset is now ${_lastOffset}") - } - } - - override def sanityCheck(): Unit = { - if (entries != 0 && _lastOffset < baseOffset) - throw new CorruptIndexException(s"Corrupt index found, index file (${file.getAbsolutePath}) has non-zero size " + - s"but the last offset is ${_lastOffset} which is less than the base offset $baseOffset.") - if (length % entrySize != 0) - throw new CorruptIndexException(s"Index file ${file.getAbsolutePath} is corrupt, found $length bytes which is " + - s"neither positive nor a multiple of $entrySize.") - } - -} - -object OffsetIndex extends Logging { - override val loggerName: String = classOf[OffsetIndex].getName -} diff --git a/core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala b/core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala index 594b74f19ffa3..4f80ee7b4f4bb 100644 --- a/core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala +++ b/core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala @@ -22,7 +22,7 @@ import kafka.utils.{CoreUtils, Logging, ShutdownableThread} import org.apache.kafka.common.Uuid import org.apache.kafka.common.errors.CorruptRecordException import org.apache.kafka.common.utils.Utils -import org.apache.kafka.server.log.internals.{OffsetPosition, TransactionIndex} +import org.apache.kafka.server.log.internals.{OffsetIndex, OffsetPosition, TransactionIndex} import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType import org.apache.kafka.server.log.remote.storage.{RemoteLogSegmentMetadata, RemoteStorageManager} diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala index fe8bfd98d4197..a4914182588c0 100755 --- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala +++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala @@ -32,7 +32,7 @@ import org.apache.kafka.common.record._ import org.apache.kafka.common.utils.Utils import org.apache.kafka.metadata.MetadataRecordSerde import org.apache.kafka.metadata.bootstrap.BootstrapDirectory -import org.apache.kafka.server.log.internals.TransactionIndex +import org.apache.kafka.server.log.internals.{OffsetIndex, TransactionIndex} import org.apache.kafka.snapshot.Snapshots import scala.jdk.CollectionConverters._ @@ -129,7 +129,7 @@ object DumpLogSegments { val startOffset = file.getName.split("\\.")(0).toLong val logFile = new File(file.getAbsoluteFile.getParent, file.getName.split("\\.")(0) + UnifiedLog.LogFileSuffix) val fileRecords = FileRecords.open(logFile, false) - val index = new OffsetIndex(file, baseOffset = startOffset, writable = false) + val index = new OffsetIndex(file, startOffset, -1, false) if (index.entries == 0) { println(s"$file is empty.") @@ -171,7 +171,7 @@ object DumpLogSegments { val logFile = new File(file.getAbsoluteFile.getParent, file.getName.split("\\.")(0) + UnifiedLog.LogFileSuffix) val fileRecords = FileRecords.open(logFile, false) val indexFile = new File(file.getAbsoluteFile.getParent, file.getName.split("\\.")(0) + UnifiedLog.IndexFileSuffix) - val index = new OffsetIndex(indexFile, baseOffset = startOffset, writable = false) + val index = new OffsetIndex(indexFile, startOffset, -1, false) val timeIndex = new TimeIndex(file, baseOffset = startOffset, writable = false) try { diff --git a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala index 269dff643b515..f877860a8324c 100644 --- a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala @@ -31,7 +31,7 @@ import org.apache.kafka.common.record.{CompressionType, ControlRecordType, Defau import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.common.MetadataVersion.IBP_0_11_0_IV0 -import org.apache.kafka.server.log.internals.AbortedTxn +import org.apache.kafka.server.log.internals.{AbortedTxn, OffsetIndex} import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, assertEquals, assertFalse, assertNotEquals, assertThrows, assertTrue} import org.junit.jupiter.api.function.Executable import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} diff --git a/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala b/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala index d2a64373d75d7..238894f890d38 100644 --- a/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala +++ b/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala @@ -21,14 +21,14 @@ import java.io._ import java.nio.file.Files import org.junit.jupiter.api.Assertions._ -import java.util.{Arrays, Collections} +import java.util.{Arrays, Collections, Optional} import org.junit.jupiter.api._ import scala.collection._ import scala.util.Random import kafka.utils.TestUtils import org.apache.kafka.common.errors.InvalidOffsetException -import org.apache.kafka.server.log.internals.OffsetPosition +import org.apache.kafka.server.log.internals.{OffsetIndex, OffsetPosition} import scala.annotation.nowarn @@ -40,7 +40,7 @@ class OffsetIndexTest { @BeforeEach def setup(): Unit = { - this.idx = new OffsetIndex(nonExistentTempFile(), baseOffset, maxIndexSize = 30 * 8) + this.idx = new OffsetIndex(nonExistentTempFile(), baseOffset, 30 * 8) } @AfterEach @@ -127,19 +127,19 @@ class OffsetIndexTest { val third = new OffsetPosition(baseOffset + 2, 23) val fourth = new OffsetPosition(baseOffset + 3, 37) - assertEquals(None, idx.fetchUpperBoundOffset(first, 5)) + assertEquals(Optional.empty, idx.fetchUpperBoundOffset(first, 5)) for (offsetPosition <- Seq(first, second, third, fourth)) idx.append(offsetPosition.offset, offsetPosition.position) - assertEquals(Some(second), idx.fetchUpperBoundOffset(first, 5)) - assertEquals(Some(second), idx.fetchUpperBoundOffset(first, 10)) - assertEquals(Some(third), idx.fetchUpperBoundOffset(first, 23)) - assertEquals(Some(third), idx.fetchUpperBoundOffset(first, 22)) - assertEquals(Some(fourth), idx.fetchUpperBoundOffset(second, 24)) - assertEquals(None, idx.fetchUpperBoundOffset(fourth, 1)) - assertEquals(None, idx.fetchUpperBoundOffset(first, 200)) - assertEquals(None, idx.fetchUpperBoundOffset(second, 200)) + assertEquals(Optional.of(second), idx.fetchUpperBoundOffset(first, 5)) + assertEquals(Optional.of(second), idx.fetchUpperBoundOffset(first, 10)) + assertEquals(Optional.of(third), idx.fetchUpperBoundOffset(first, 23)) + assertEquals(Optional.of(third), idx.fetchUpperBoundOffset(first, 22)) + assertEquals(Optional.of(fourth), idx.fetchUpperBoundOffset(second, 24)) + assertEquals(Optional.empty, idx.fetchUpperBoundOffset(fourth, 1)) + assertEquals(Optional.empty, idx.fetchUpperBoundOffset(first, 200)) + assertEquals(Optional.empty, idx.fetchUpperBoundOffset(second, 200)) } @Test @@ -149,7 +149,7 @@ class OffsetIndexTest { idx.append(first.offset, first.position) idx.append(sec.offset, sec.position) idx.close() - val idxRo = new OffsetIndex(idx.file, baseOffset = idx.baseOffset) + val idxRo = new OffsetIndex(idx.file, idx.baseOffset) assertEquals(first, idxRo.lookup(first.offset)) assertEquals(sec, idxRo.lookup(sec.offset)) assertEquals(sec.offset, idxRo.lastOffset) @@ -159,8 +159,8 @@ class OffsetIndexTest { @Test def truncate(): Unit = { - val idx = new OffsetIndex(nonExistentTempFile(), baseOffset = 0L, maxIndexSize = 10 * 8) - idx.truncate() + val idx = new OffsetIndex(nonExistentTempFile(), 0L, 10 * 8) + idx.truncate() for(i <- 1 until 10) idx.append(i, i) @@ -200,7 +200,7 @@ class OffsetIndexTest { @Test def forceUnmapTest(): Unit = { - val idx = new OffsetIndex(nonExistentTempFile(), baseOffset = 0L, maxIndexSize = 10 * 8) + val idx = new OffsetIndex(nonExistentTempFile(), 0L, 10 * 8) idx.forceUnmap() // mmap should be null after unmap causing lookup to throw a NPE assertThrows(classOf[NullPointerException], () => idx.lookup(1)) @@ -210,7 +210,7 @@ class OffsetIndexTest { def testSanityLastOffsetEqualToBaseOffset(): Unit = { // Test index sanity for the case where the last offset appended to the index is equal to the base offset val baseOffset = 20L - val idx = new OffsetIndex(nonExistentTempFile(), baseOffset = baseOffset, maxIndexSize = 10 * 8) + val idx = new OffsetIndex(nonExistentTempFile(), baseOffset, 10 * 8) idx.append(baseOffset, 0) idx.sanityCheck() } diff --git a/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala b/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala index d518ac48e40f3..102b137dd6510 100644 --- a/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala +++ b/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala @@ -16,10 +16,10 @@ */ package kafka.log.remote -import kafka.log.{OffsetIndex, TimeIndex, UnifiedLog} +import kafka.log.{TimeIndex, UnifiedLog} import kafka.utils.MockTime import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} -import org.apache.kafka.server.log.internals.OffsetPosition +import org.apache.kafka.server.log.internals.{OffsetIndex, OffsetPosition} import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType import org.apache.kafka.server.log.remote.storage.{RemoteLogSegmentId, RemoteLogSegmentMetadata, RemoteStorageManager} import org.apache.kafka.test.TestUtils @@ -63,7 +63,7 @@ class RemoteIndexCacheTest { val indexType = ans.getArgument[IndexType](1) val maxEntries = (metadata.endOffset() - metadata.startOffset()).asInstanceOf[Int] val offsetIdx = new OffsetIndex(new File(tpDir, String.valueOf(metadata.startOffset()) + UnifiedLog.IndexFileSuffix), - metadata.startOffset(), maxIndexSize = maxEntries * 8) + metadata.startOffset(), maxEntries * 8) val timeIdx = new TimeIndex(new File(tpDir, String.valueOf(metadata.startOffset()) + UnifiedLog.TimeIndexFileSuffix), metadata.startOffset(), maxIndexSize = maxEntries * 12) maybeAppendIndexEntries(offsetIdx, timeIdx) diff --git a/core/src/test/scala/unit/kafka/log/remote/RemoteLogManagerTest.scala b/core/src/test/scala/unit/kafka/log/remote/RemoteLogManagerTest.scala index 6fb62c1484389..142921d6bdd5e 100644 --- a/core/src/test/scala/unit/kafka/log/remote/RemoteLogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/remote/RemoteLogManagerTest.scala @@ -17,7 +17,7 @@ package kafka.log.remote import kafka.cluster.Partition -import kafka.log.{OffsetIndex, TimeIndex, UnifiedLog} +import kafka.log.{TimeIndex, UnifiedLog} import kafka.server.KafkaConfig import kafka.server.checkpoints.LeaderEpochCheckpoint import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache} @@ -26,6 +26,7 @@ import org.apache.kafka.common.config.AbstractConfig import org.apache.kafka.common.record.FileRecords.TimestampAndOffset import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord} import org.apache.kafka.common.{KafkaException, TopicIdPartition, TopicPartition, Uuid} +import org.apache.kafka.server.log.internals.OffsetIndex import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType import org.apache.kafka.server.log.remote.storage._ import org.apache.kafka.test.TestUtils @@ -187,7 +188,7 @@ class RemoteLogManagerTest { val indexType = ans.getArgument[IndexType](1) val maxEntries = (metadata.endOffset() - metadata.startOffset()).asInstanceOf[Int] val offsetIdx = new OffsetIndex(new File(tpDir, String.valueOf(metadata.startOffset()) + UnifiedLog.IndexFileSuffix), - metadata.startOffset(), maxIndexSize = maxEntries * 8) + metadata.startOffset(), maxEntries * 8) val timeIdx = new TimeIndex(new File(tpDir, String.valueOf(metadata.startOffset()) + UnifiedLog.TimeIndexFileSuffix), metadata.startOffset(), maxIndexSize = maxEntries * 12) indexType match { @@ -273,4 +274,4 @@ class RemoteLogManagerTest { new RemoteLogManagerConfig(config) } -} \ No newline at end of file +} diff --git a/storage/src/main/java/org/apache/kafka/server/log/internals/OffsetIndex.java b/storage/src/main/java/org/apache/kafka/server/log/internals/OffsetIndex.java new file mode 100644 index 0000000000000..d3aa706ffc289 --- /dev/null +++ b/storage/src/main/java/org/apache/kafka/server/log/internals/OffsetIndex.java @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import org.apache.kafka.common.errors.InvalidOffsetException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Optional; + +/** + * An index that maps offsets to physical file locations for a particular log segment. This index may be sparse: + * that is it may not hold an entry for all messages in the log. + * + * The index is stored in a file that is pre-allocated to hold a fixed maximum number of 8-byte entries. + * + * The index supports lookups against a memory-map of this file. These lookups are done using a simple binary search variant + * to locate the offset/location pair for the greatest offset less than or equal to the target offset. + * + * Index files can be opened in two ways: either as an empty, mutable index that allows appends or + * an immutable read-only index file that has previously been populated. The makeReadOnly method will turn a mutable file into an + * immutable one and truncate off any extra bytes. This is done when the index file is rolled over. + * + * No attempt is made to checksum the contents of this file, in the event of a crash it is rebuilt. + * + * The file format is a series of entries. The physical format is a 4 byte "relative" offset and a 4 byte file location for the + * message with that offset. The offset stored is relative to the base offset of the index file. So, for example, + * if the base offset was 50, then the offset 55 would be stored as 5. Using relative offsets in this way let's us use + * only 4 bytes for the offset. + * + * The frequency of entries is up to the user of this class. + * + * All external APIs translate from relative offsets to full offsets, so users of this class do not interact with the internal + * storage format. + */ +public class OffsetIndex extends AbstractIndex { + private static final Logger log = LoggerFactory.getLogger(OffsetIndex.class); + private static final int ENTRY_SIZE = 8; + + /* the last offset in the index */ + private long lastOffset; + + public OffsetIndex(File file, long baseOffset) throws IOException { + this(file, baseOffset, -1); + } + + public OffsetIndex(File file, long baseOffset, int maxIndexSize) throws IOException { + this(file, baseOffset, maxIndexSize, true); + } + + public OffsetIndex(File file, long baseOffset, int maxIndexSize, boolean writable) throws IOException { + super(file, baseOffset, maxIndexSize, writable); + + lastOffset = lastEntry().offset; + + log.debug("Loaded index file {} with maxEntries = {}, maxIndexSize = {}, entries = {}, lastOffset = {}, file position = {}", + file.getAbsolutePath(), maxEntries(), maxIndexSize, entries(), lastOffset, mmap().position()); + } + + @Override + public void sanityCheck() { + if (entries() != 0 && lastOffset < baseOffset()) + throw new CorruptIndexException("Corrupt index found, index file " + file().getAbsolutePath() + " has non-zero size " + + "but the last offset is " + lastOffset + " which is less than the base offset " + baseOffset()); + if (length() % entrySize() != 0) + throw new CorruptIndexException("Index file " + file().getAbsolutePath() + " is corrupt, found " + length() + + " bytes which is neither positive nor a multiple of " + ENTRY_SIZE); + } + + /** + * Find the largest offset less than or equal to the given targetOffset + * and return a pair holding this offset and its corresponding physical file position. + * + * @param targetOffset The offset to look up. + * @return The offset found and the corresponding file position for this offset + * If the target offset is smaller than the least entry in the index (or the index is empty), + * the pair (baseOffset, 0) is returned. + */ + public OffsetPosition lookup(long targetOffset) { + return maybeLock(lock, () -> { + ByteBuffer idx = mmap().duplicate(); + int slot = largestLowerBoundSlotFor(idx, targetOffset, IndexSearchType.KEY); + if (slot == -1) + return new OffsetPosition(baseOffset(), 0); + else + return parseEntry(idx, slot); + }); + } + + /** + * Get the nth offset mapping from the index + * @param n The entry number in the index + * @return The offset/position pair at that entry + */ + public OffsetPosition entry(int n) { + return maybeLock(lock, () -> { + if (n >= entries()) + throw new IllegalArgumentException("Attempt to fetch the " + n + "th entry from index " + + file().getAbsolutePath() + ", which has size " + entries()); + return parseEntry(mmap(), n); + }); + } + + /** + * Find an upper bound offset for the given fetch starting position and size. This is an offset which + * is guaranteed to be outside the fetched range, but note that it will not generally be the smallest + * such offset. + */ + public Optional fetchUpperBoundOffset(OffsetPosition fetchOffset, int fetchSize) { + return maybeLock(lock, () -> { + ByteBuffer idx = mmap().duplicate(); + int slot = smallestUpperBoundSlotFor(idx, fetchOffset.position + fetchSize, IndexSearchType.VALUE); + if (slot == -1) + return Optional.empty(); + else + return Optional.of(parseEntry(idx, slot)); + }); + } + + /** + * Append an entry for the given offset/location pair to the index. This entry must have a larger offset than all subsequent entries. + * @throws IndexOffsetOverflowException if the offset causes index offset to overflow + * @throws InvalidOffsetException if provided offset is not larger than the last offset + */ + public void append(long offset, int position) { + lock.lock(); + try { + if (isFull()) + throw new IllegalArgumentException("Attempt to append to a full index (size = " + entries() + ")."); + + if (entries() == 0 || offset > lastOffset) { + log.trace("Adding index entry {} => {} to {}", offset, position, file().getAbsolutePath()); + mmap().putInt(relativeOffset(offset)); + mmap().putInt(position); + incrementEntries(); + lastOffset = offset; + if (entries() * ENTRY_SIZE != mmap().position()) + throw new IllegalStateException(entries() + " entries but file position in index is " + mmap().position()); + } else + throw new InvalidOffsetException("Attempt to append an offset " + offset + " to position " + entries() + + " no larger than the last offset appended (" + lastOffset + ") to " + file().getAbsolutePath()); + } finally { + lock.unlock(); + } + } + + @Override + public void truncateTo(long offset) { + lock.lock(); + try { + ByteBuffer idx = mmap().duplicate(); + int slot = largestLowerBoundSlotFor(idx, offset, IndexSearchType.KEY); + + /* There are 3 cases for choosing the new size + * 1) if there is no entry in the index <= the offset, delete everything + * 2) if there is an entry for this exact offset, delete it and everything larger than it + * 3) if there is no entry for this offset, delete everything larger than the next smallest + */ + int newEntries; + if (slot < 0) + newEntries = 0; + else if (relativeOffset(idx, slot) == offset - baseOffset()) + newEntries = slot; + else + newEntries = slot + 1; + truncateToEntries(newEntries); + } finally { + lock.unlock(); + } + } + + public long lastOffset() { + return lastOffset; + } + + @Override + public void truncate() { + truncateToEntries(0); + } + + @Override + protected int entrySize() { + return ENTRY_SIZE; + } + + @Override + protected OffsetPosition parseEntry(ByteBuffer buffer, int n) { + return new OffsetPosition(baseOffset() + relativeOffset(buffer, n), physical(buffer, n)); + } + + private int relativeOffset(ByteBuffer buffer, int n) { + return buffer.getInt(n * ENTRY_SIZE); + } + + private int physical(ByteBuffer buffer, int n) { + return buffer.getInt(n * ENTRY_SIZE + 4); + } + + /** + * Truncates index to a known number of entries. + */ + private void truncateToEntries(int entries) { + lock.lock(); + try { + super.truncateToEntries0(entries); + this.lastOffset = lastEntry().offset; + log.debug("Truncated index {} to {} entries; position is now {} and last offset is now {}", + file().getAbsolutePath(), entries, mmap().position(), lastOffset); + } finally { + lock.unlock(); + } + } + + /** + * The last entry in the index + */ + private OffsetPosition lastEntry() { + lock.lock(); + try { + int entries = entries(); + if (entries == 0) + return new OffsetPosition(baseOffset(), 0); + else + return parseEntry(mmap(), entries - 1); + } finally { + lock.unlock(); + } + } +}