From a776107931e57ed4236166d28fc4e1c4c04d1ec6 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Sun, 11 Dec 2022 21:23:21 -0800 Subject: [PATCH 1/3] Move TimeIndex to storage module --- core/src/main/scala/kafka/log/LazyIndex.scala | 2 +- .../src/main/scala/kafka/log/LogSegment.scala | 9 +- core/src/main/scala/kafka/log/TimeIndex.scala | 228 -------------- .../kafka/log/remote/RemoteIndexCache.scala | 2 +- .../scala/kafka/tools/DumpLogSegments.scala | 4 +- .../scala/unit/kafka/log/TimeIndexTest.scala | 6 +- .../log/remote/RemoteIndexCacheTest.scala | 8 +- .../log/remote/RemoteLogManagerTest.scala | 6 +- .../kafka/server/log/internals/TimeIndex.java | 289 ++++++++++++++++++ 9 files changed, 307 insertions(+), 247 deletions(-) delete mode 100644 core/src/main/scala/kafka/log/TimeIndex.scala create mode 100644 storage/src/main/java/org/apache/kafka/server/log/internals/TimeIndex.java diff --git a/core/src/main/scala/kafka/log/LazyIndex.scala b/core/src/main/scala/kafka/log/LazyIndex.scala index 6725d034a1bee..26afbd9986522 100644 --- a/core/src/main/scala/kafka/log/LazyIndex.scala +++ b/core/src/main/scala/kafka/log/LazyIndex.scala @@ -24,7 +24,7 @@ import LazyIndex._ import kafka.utils.CoreUtils.inLock import kafka.utils.threadsafe import org.apache.kafka.common.utils.Utils -import org.apache.kafka.server.log.internals.{AbstractIndex, OffsetIndex} +import org.apache.kafka.server.log.internals.{AbstractIndex, OffsetIndex, TimeIndex} /** * A wrapper over an `AbstractIndex` instance that provides a mechanism to defer loading diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index fbe50f8225756..4ae8c924d9f96 100755 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -32,7 +32,7 @@ import org.apache.kafka.common.errors.CorruptRecordException import org.apache.kafka.common.record.FileRecords.{LogOffsetPosition, TimestampAndOffset} import org.apache.kafka.common.record._ import org.apache.kafka.common.utils.{BufferSupplier, Time} -import org.apache.kafka.server.log.internals.{AbortedTxn, CompletedTxn, OffsetIndex, OffsetPosition, TimestampOffset, TransactionIndex, TxnIndexSearchResult} +import org.apache.kafka.server.log.internals.{AbortedTxn, CompletedTxn, OffsetIndex, OffsetPosition, TimeIndex, TimestampOffset, TransactionIndex, TxnIndexSearchResult} import java.util.Optional import scala.jdk.CollectionConverters._ @@ -381,7 +381,7 @@ class LogSegment private[log] (val log: FileRecords, log.truncateTo(validBytes) offsetIndex.trimToValidSize() // A normally closed segment always appends the biggest timestamp ever seen into log segment, we do this as well. - timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar, skipFullCheck = true) + timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar, true) timeIndex.trimToValidSize() truncated } @@ -511,7 +511,7 @@ class LogSegment private[log] (val log: FileRecords, * The time index entry appended will be used to decide when to delete the segment. */ def onBecomeInactiveSegment(): Unit = { - timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar, skipFullCheck = true) + timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar, true) offsetIndex.trimToValidSize() timeIndex.trimToValidSize() log.trim() @@ -593,8 +593,7 @@ class LogSegment private[log] (val log: FileRecords, */ def close(): Unit = { if (_maxTimestampAndOffsetSoFar != TimestampOffset.UNKNOWN) - CoreUtils.swallow(timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar, - skipFullCheck = true), this) + CoreUtils.swallow(timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar, true), this) CoreUtils.swallow(lazyOffsetIndex.close(), this) CoreUtils.swallow(lazyTimeIndex.close(), this) CoreUtils.swallow(log.close(), this) diff --git a/core/src/main/scala/kafka/log/TimeIndex.scala b/core/src/main/scala/kafka/log/TimeIndex.scala deleted file mode 100644 index f5ffc7f6c5291..0000000000000 --- a/core/src/main/scala/kafka/log/TimeIndex.scala +++ /dev/null @@ -1,228 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.log - -import java.io.File -import java.nio.ByteBuffer -import kafka.utils.CoreUtils.inLock -import kafka.utils.Logging -import org.apache.kafka.common.errors.InvalidOffsetException -import org.apache.kafka.common.record.RecordBatch -import org.apache.kafka.server.log.internals.{AbstractIndex, CorruptIndexException, IndexSearchType, TimestampOffset} - -/** - * An index that maps from the timestamp to the logical offsets of the messages in a segment. This index might be - * sparse, i.e. it may not hold an entry for all the messages in the segment. - * - * The index is stored in a file that is preallocated to hold a fixed maximum amount of 12-byte time index entries. - * The file format is a series of time index entries. The physical format is a 8 bytes timestamp and a 4 bytes "relative" - * offset used in the [[OffsetIndex]]. A time index entry (TIMESTAMP, OFFSET) means that the biggest timestamp seen - * before OFFSET is TIMESTAMP. i.e. Any message whose timestamp is greater than TIMESTAMP must come after OFFSET. - * - * All external APIs translate from relative offsets to full offsets, so users of this class do not interact with the internal - * storage format. - * - * The timestamps in the same time index file are guaranteed to be monotonically increasing. - * - * The index supports timestamp lookup for a memory map of this file. The lookup is done using a binary search to find - * the offset of the message whose indexed timestamp is closest but smaller or equals to the target timestamp. - * - * Time index files can be opened in two ways: either as an empty, mutable index that allows appending or - * an immutable read-only index file that has previously been populated. The makeReadOnly method will turn a mutable file into an - * immutable one and truncate off any extra bytes. This is done when the index file is rolled over. - * - * No attempt is made to checksum the contents of this file, in the event of a crash it is rebuilt. - * - */ -// Avoid shadowing mutable file in AbstractIndex -class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable: Boolean = true) - extends AbstractIndex(_file, baseOffset, maxIndexSize, writable) { - import TimeIndex._ - - @volatile private var _lastEntry = lastEntryFromIndexFile - - override def entrySize = 12 - - debug(s"Loaded index file ${file.getAbsolutePath} with maxEntries = $maxEntries, maxIndexSize = $maxIndexSize," + - s" entries = $entries, lastOffset = ${_lastEntry}, file position = ${mmap.position()}") - - // We override the full check to reserve the last time index entry slot for the on roll call. - override def isFull: Boolean = entries >= maxEntries - 1 - - private def timestamp(buffer: ByteBuffer, n: Int): Long = buffer.getLong(n * entrySize) - - private def relativeOffset(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * entrySize + 8) - - def lastEntry: TimestampOffset = _lastEntry - - /** - * Read the last entry from the index file. This operation involves disk access. - */ - private def lastEntryFromIndexFile: TimestampOffset = { - inLock(lock) { - entries match { - case 0 => new TimestampOffset(RecordBatch.NO_TIMESTAMP, baseOffset) - case s => parseEntry(mmap, s - 1) - } - } - } - - /** - * Get the nth timestamp mapping from the time index - * @param n The entry number in the time index - * @return The timestamp/offset pair at that entry - */ - def entry(n: Int): TimestampOffset = { - maybeLock(lock, { () => - if(n >= entries) - throw new IllegalArgumentException(s"Attempt to fetch the ${n}th entry from time index ${file.getAbsolutePath} " + - s"which has size $entries.") - parseEntry(mmap, n) - }) - } - - override def parseEntry(buffer: ByteBuffer, n: Int): TimestampOffset = { - new TimestampOffset(timestamp(buffer, n), baseOffset + relativeOffset(buffer, n)) - } - - /** - * Attempt to append a time index entry to the time index. - * The new entry is appended only if both the timestamp and offset are greater than the last appended timestamp and - * the last appended offset. - * - * @param timestamp The timestamp of the new time index entry - * @param offset The offset of the new time index entry - * @param skipFullCheck To skip checking whether the segment is full or not. We only skip the check when the segment - * gets rolled or the segment is closed. - */ - def maybeAppend(timestamp: Long, offset: Long, skipFullCheck: Boolean = false): Unit = { - inLock(lock) { - if (!skipFullCheck) - require(!isFull, "Attempt to append to a full time index (size = " + entries + ").") - // We do not throw exception when the offset equals to the offset of last entry. That means we are trying - // to insert the same time index entry as the last entry. - // If the timestamp index entry to be inserted is the same as the last entry, we simply ignore the insertion - // because that could happen in the following two scenarios: - // 1. A log segment is closed. - // 2. LogSegment.onBecomeInactiveSegment() is called when an active log segment is rolled. - if (entries != 0 && offset < lastEntry.offset) - throw new InvalidOffsetException(s"Attempt to append an offset ($offset) to slot $entries no larger than" + - s" the last offset appended (${lastEntry.offset}) to ${file.getAbsolutePath}.") - if (entries != 0 && timestamp < lastEntry.timestamp) - throw new IllegalStateException(s"Attempt to append a timestamp ($timestamp) to slot $entries no larger" + - s" than the last timestamp appended (${lastEntry.timestamp}) to ${file.getAbsolutePath}.") - // We only append to the time index when the timestamp is greater than the last inserted timestamp. - // If all the messages are in message format v0, the timestamp will always be NoTimestamp. In that case, the time - // index will be empty. - if (timestamp > lastEntry.timestamp) { - trace(s"Adding index entry $timestamp => $offset to ${file.getAbsolutePath}.") - mmap.putLong(timestamp) - mmap.putInt(relativeOffset(offset)) - incrementEntries() - _lastEntry = new TimestampOffset(timestamp, offset) - require(entries * entrySize == mmap.position(), s"$entries entries but file position in index is ${mmap.position()}.") - } - } - } - - /** - * Find the time index entry whose timestamp is less than or equal to the given timestamp. - * If the target timestamp is smaller than the least timestamp in the time index, (NoTimestamp, baseOffset) is - * returned. - * - * @param targetTimestamp The timestamp to look up. - * @return The time index entry found. - */ - def lookup(targetTimestamp: Long): TimestampOffset = { - maybeLock(lock, {() => - val idx = mmap.duplicate - val slot = largestLowerBoundSlotFor(idx, targetTimestamp, IndexSearchType.KEY) - if (slot == -1) - new TimestampOffset(RecordBatch.NO_TIMESTAMP, baseOffset) - else - parseEntry(idx, slot) - }) - } - - override def truncate(): Unit = truncateToEntries(0) - - /** - * Remove all entries from the index which have an offset greater than or equal to the given offset. - * Truncating to an offset larger than the largest in the index has no effect. - */ - override def truncateTo(offset: Long): Unit = { - inLock(lock) { - val idx = mmap.duplicate - val slot = largestLowerBoundSlotFor(idx, offset, IndexSearchType.VALUE) - - /* There are 3 cases for choosing the new size - * 1) if there is no entry in the index <= the offset, delete everything - * 2) if there is an entry for this exact offset, delete it and everything larger than it - * 3) if there is no entry for this offset, delete everything larger than the next smallest - */ - val newEntries = - if(slot < 0) - 0 - else if(relativeOffset(idx, slot) == offset - baseOffset) - slot - else - slot + 1 - truncateToEntries(newEntries) - } - } - - override def resize(newSize: Int): Boolean = { - inLock(lock) { - if (super.resize(newSize)) { - _lastEntry = lastEntryFromIndexFile - true - } else - false - } - } - - /** - * Truncates index to a known number of entries. - */ - private def truncateToEntries(entries: Int): Unit = { - inLock(lock) { - super.truncateToEntries0(entries) - _lastEntry = lastEntryFromIndexFile - debug(s"Truncated index ${file.getAbsolutePath} to $entries entries; position is now ${mmap.position()} and last entry is now ${_lastEntry}") - } - } - - override def sanityCheck(): Unit = { - val lastTimestamp = lastEntry.timestamp - val lastOffset = lastEntry.offset - if (entries != 0 && lastTimestamp < timestamp(mmap, 0)) - throw new CorruptIndexException(s"Corrupt time index found, time index file (${file.getAbsolutePath}) has " + - s"non-zero size but the last timestamp is $lastTimestamp which is less than the first timestamp " + - s"${timestamp(mmap, 0)}") - if (entries != 0 && lastOffset < baseOffset) - throw new CorruptIndexException(s"Corrupt time index found, time index file (${file.getAbsolutePath}) has " + - s"non-zero size but the last offset is $lastOffset which is less than the first offset $baseOffset") - if (length % entrySize != 0) - throw new CorruptIndexException(s"Time index file ${file.getAbsolutePath} is corrupt, found $length bytes " + - s"which is neither positive nor a multiple of $entrySize.") - } -} - -object TimeIndex extends Logging { - override val loggerName: String = classOf[TimeIndex].getName -} diff --git a/core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala b/core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala index 4f80ee7b4f4bb..e48b833fd4383 100644 --- a/core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala +++ b/core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala @@ -22,7 +22,7 @@ import kafka.utils.{CoreUtils, Logging, ShutdownableThread} import org.apache.kafka.common.Uuid import org.apache.kafka.common.errors.CorruptRecordException import org.apache.kafka.common.utils.Utils -import org.apache.kafka.server.log.internals.{OffsetIndex, OffsetPosition, TransactionIndex} +import org.apache.kafka.server.log.internals.{OffsetIndex, OffsetPosition, TimeIndex, TransactionIndex} import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType import org.apache.kafka.server.log.remote.storage.{RemoteLogSegmentMetadata, RemoteStorageManager} diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala index a4914182588c0..3302bfe91b485 100755 --- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala +++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala @@ -32,7 +32,7 @@ import org.apache.kafka.common.record._ import org.apache.kafka.common.utils.Utils import org.apache.kafka.metadata.MetadataRecordSerde import org.apache.kafka.metadata.bootstrap.BootstrapDirectory -import org.apache.kafka.server.log.internals.{OffsetIndex, TransactionIndex} +import org.apache.kafka.server.log.internals.{OffsetIndex, TimeIndex, TransactionIndex} import org.apache.kafka.snapshot.Snapshots import scala.jdk.CollectionConverters._ @@ -172,7 +172,7 @@ object DumpLogSegments { val fileRecords = FileRecords.open(logFile, false) val indexFile = new File(file.getAbsoluteFile.getParent, file.getName.split("\\.")(0) + UnifiedLog.IndexFileSuffix) val index = new OffsetIndex(indexFile, startOffset, -1, false) - val timeIndex = new TimeIndex(file, baseOffset = startOffset, writable = false) + val timeIndex = new TimeIndex(file, startOffset, -1, false) try { //Check that index passes sanityCheck, this is the check that determines if indexes will be rebuilt on startup or not. diff --git a/core/src/test/scala/unit/kafka/log/TimeIndexTest.scala b/core/src/test/scala/unit/kafka/log/TimeIndexTest.scala index 49ce3f64ba68c..8502eb06535e2 100644 --- a/core/src/test/scala/unit/kafka/log/TimeIndexTest.scala +++ b/core/src/test/scala/unit/kafka/log/TimeIndexTest.scala @@ -20,7 +20,7 @@ package kafka.log import java.io.File import kafka.utils.TestUtils import org.apache.kafka.common.errors.InvalidOffsetException -import org.apache.kafka.server.log.internals.{CorruptIndexException, TimestampOffset} +import org.apache.kafka.server.log.internals.{CorruptIndexException, TimeIndex, TimestampOffset} import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows} @@ -34,7 +34,7 @@ class TimeIndexTest { @BeforeEach def setup(): Unit = { - this.idx = new TimeIndex(nonExistantTempFile(), baseOffset = baseOffset, maxIndexSize = maxEntries * 12) + this.idx = new TimeIndex(nonExistantTempFile(), baseOffset, maxEntries * 12) } @AfterEach @@ -114,7 +114,7 @@ class TimeIndexTest { var shouldCorruptOffset = false var shouldCorruptTimestamp = false var shouldCorruptLength = false - idx = new TimeIndex(idx.file, baseOffset = baseOffset, maxIndexSize = maxEntries * 12) { + idx = new TimeIndex(idx.file, baseOffset, maxEntries * 12) { override def lastEntry = { val superLastEntry = super.lastEntry val offset = if (shouldCorruptOffset) baseOffset - 1 else superLastEntry.offset diff --git a/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala b/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala index 102b137dd6510..3c45f0916c463 100644 --- a/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala +++ b/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala @@ -16,10 +16,10 @@ */ package kafka.log.remote -import kafka.log.{TimeIndex, UnifiedLog} +import kafka.log.UnifiedLog import kafka.utils.MockTime import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} -import org.apache.kafka.server.log.internals.{OffsetIndex, OffsetPosition} +import org.apache.kafka.server.log.internals.{OffsetIndex, OffsetPosition, TimeIndex} import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType import org.apache.kafka.server.log.remote.storage.{RemoteLogSegmentId, RemoteLogSegmentMetadata, RemoteStorageManager} import org.apache.kafka.test.TestUtils @@ -65,7 +65,7 @@ class RemoteIndexCacheTest { val offsetIdx = new OffsetIndex(new File(tpDir, String.valueOf(metadata.startOffset()) + UnifiedLog.IndexFileSuffix), metadata.startOffset(), maxEntries * 8) val timeIdx = new TimeIndex(new File(tpDir, String.valueOf(metadata.startOffset()) + UnifiedLog.TimeIndexFileSuffix), - metadata.startOffset(), maxIndexSize = maxEntries * 12) + metadata.startOffset(), maxEntries * 12) maybeAppendIndexEntries(offsetIdx, timeIdx) indexType match { case IndexType.OFFSET => new FileInputStream(offsetIdx.file) @@ -229,7 +229,7 @@ class RemoteIndexCacheTest { for (i <- 0 until offsetIndex.maxEntries) { val offset = offsetIndex.baseOffset + i offsetIndex.append(offset, i) - timeIndex.maybeAppend(curTime + i, offset, skipFullCheck = true) + timeIndex.maybeAppend(curTime + i, offset, true) } offsetIndex.flush() timeIndex.flush() diff --git a/core/src/test/scala/unit/kafka/log/remote/RemoteLogManagerTest.scala b/core/src/test/scala/unit/kafka/log/remote/RemoteLogManagerTest.scala index 142921d6bdd5e..8876dbd27f703 100644 --- a/core/src/test/scala/unit/kafka/log/remote/RemoteLogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/remote/RemoteLogManagerTest.scala @@ -17,7 +17,7 @@ package kafka.log.remote import kafka.cluster.Partition -import kafka.log.{TimeIndex, UnifiedLog} +import kafka.log.UnifiedLog import kafka.server.KafkaConfig import kafka.server.checkpoints.LeaderEpochCheckpoint import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache} @@ -26,7 +26,7 @@ import org.apache.kafka.common.config.AbstractConfig import org.apache.kafka.common.record.FileRecords.TimestampAndOffset import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord} import org.apache.kafka.common.{KafkaException, TopicIdPartition, TopicPartition, Uuid} -import org.apache.kafka.server.log.internals.OffsetIndex +import org.apache.kafka.server.log.internals.{OffsetIndex, TimeIndex} import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType import org.apache.kafka.server.log.remote.storage._ import org.apache.kafka.test.TestUtils @@ -190,7 +190,7 @@ class RemoteLogManagerTest { val offsetIdx = new OffsetIndex(new File(tpDir, String.valueOf(metadata.startOffset()) + UnifiedLog.IndexFileSuffix), metadata.startOffset(), maxEntries * 8) val timeIdx = new TimeIndex(new File(tpDir, String.valueOf(metadata.startOffset()) + UnifiedLog.TimeIndexFileSuffix), - metadata.startOffset(), maxIndexSize = maxEntries * 12) + metadata.startOffset(), maxEntries * 12) indexType match { case IndexType.OFFSET => new FileInputStream(offsetIdx.file) case IndexType.TIMESTAMP => new FileInputStream(timeIdx.file) diff --git a/storage/src/main/java/org/apache/kafka/server/log/internals/TimeIndex.java b/storage/src/main/java/org/apache/kafka/server/log/internals/TimeIndex.java new file mode 100644 index 0000000000000..30126439b983d --- /dev/null +++ b/storage/src/main/java/org/apache/kafka/server/log/internals/TimeIndex.java @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import org.apache.kafka.common.errors.InvalidOffsetException; +import org.apache.kafka.common.record.RecordBatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; + +/** + * An index that maps from the timestamp to the logical offsets of the messages in a segment. This index might be + * sparse, i.e. it may not hold an entry for all the messages in the segment. + * + * The index is stored in a file that is preallocated to hold a fixed maximum amount of 12-byte time index entries. + * The file format is a series of time index entries. The physical format is a 8 bytes timestamp and a 4 bytes "relative" + * offset used in the [[OffsetIndex]]. A time index entry (TIMESTAMP, OFFSET) means that the biggest timestamp seen + * before OFFSET is TIMESTAMP. i.e. Any message whose timestamp is greater than TIMESTAMP must come after OFFSET. + * + * All external APIs translate from relative offsets to full offsets, so users of this class do not interact with the internal + * storage format. + * + * The timestamps in the same time index file are guaranteed to be monotonically increasing. + * + * The index supports timestamp lookup for a memory map of this file. The lookup is done using a binary search to find + * the offset of the message whose indexed timestamp is closest but smaller or equals to the target timestamp. + * + * Time index files can be opened in two ways: either as an empty, mutable index that allows appending or + * an immutable read-only index file that has previously been populated. The makeReadOnly method will turn a mutable file into an + * immutable one and truncate off any extra bytes. This is done when the index file is rolled over. + * + * No attempt is made to checksum the contents of this file, in the event of a crash it is rebuilt. + * + */ +public class TimeIndex extends AbstractIndex { + private static final Logger log = LoggerFactory.getLogger(TimeIndex.class); + private static final int ENTRY_SIZE = 12; + + private volatile TimestampOffset lastEntry; + + public TimeIndex(File file, long baseOffset) throws IOException { + this(file, baseOffset, -1); + } + + public TimeIndex(File file, long baseOffset, int maxIndexSize) throws IOException { + this(file, baseOffset, maxIndexSize, true); + } + + public TimeIndex(File file, long baseOffset, int maxIndexSize, boolean writable) throws IOException { + super(file, baseOffset, maxIndexSize, writable); + + this.lastEntry = lastEntryFromIndexFile(); + + log.debug("Loaded index file {} with maxEntries = {}, maxIndexSize = {}, entries = {}, lastOffset = {}, file position = {}", + file.getAbsolutePath(), maxEntries(), maxIndexSize, entries(), lastEntry, mmap().position()); + } + + @Override + public void sanityCheck() { + long lastTimestamp = lastEntry.timestamp; + long lastOffset = lastEntry.offset; + if (entries() != 0 && lastTimestamp < timestamp(mmap(), 0)) + throw new CorruptIndexException("Corrupt time index found, time index file (" + file().getAbsolutePath() + ") has " + + "non-zero size but the last timestamp is " + lastTimestamp + " which is less than the first timestamp " + + timestamp(mmap(), 0)); + if (entries() != 0 && lastOffset < baseOffset()) + throw new CorruptIndexException("Corrupt time index found, time index file (" + file().getAbsolutePath() + ") has " + + "non-zero size but the last offset is " + lastOffset + " which is less than the first offset " + baseOffset()); + if (length() % ENTRY_SIZE != 0) + throw new CorruptIndexException("Time index file " + file().getAbsolutePath() + " is corrupt, found " + length() + + " bytes which is neither positive nor a multiple of " + ENTRY_SIZE); + } + + /** + * Remove all entries from the index which have an offset greater than or equal to the given offset. + * Truncating to an offset larger than the largest in the index has no effect. + */ + @Override + public void truncateTo(long offset) { + lock.lock(); + try { + ByteBuffer idx = mmap().duplicate(); + int slot = largestLowerBoundSlotFor(idx, offset, IndexSearchType.VALUE); + + /* There are 3 cases for choosing the new size + * 1) if there is no entry in the index <= the offset, delete everything + * 2) if there is an entry for this exact offset, delete it and everything larger than it + * 3) if there is no entry for this offset, delete everything larger than the next smallest + */ + int newEntries; + if (slot < 0) + newEntries = 0; + else if (relativeOffset(idx, slot) == offset - baseOffset()) + newEntries = slot; + else + newEntries = slot + 1; + + truncateToEntries(newEntries); + } finally { + lock.unlock(); + } + } + + // We override the full check to reserve the last time index entry slot for the on roll call. + @Override + public boolean isFull() { + return entries() >= maxEntries() - 1; + } + + public TimestampOffset lastEntry() { + return lastEntry; + } + + /** + * Get the nth timestamp mapping from the time index + * @param n The entry number in the time index + * @return The timestamp/offset pair at that entry + */ + public TimestampOffset entry(int n) { + return maybeLock(lock, () -> { + if (n >= entries()) + throw new IllegalArgumentException("Attempt to fetch the " + n + "th entry from time index " + + file().getAbsolutePath() + " which has size " + entries()); + return parseEntry(mmap(), n); + }); + } + + /** + * Find the time index entry whose timestamp is less than or equal to the given timestamp. + * If the target timestamp is smaller than the least timestamp in the time index, (NoTimestamp, baseOffset) is + * returned. + * + * @param targetTimestamp The timestamp to look up. + * @return The time index entry found. + */ + public TimestampOffset lookup(long targetTimestamp) { + return maybeLock(lock, () -> { + ByteBuffer idx = mmap().duplicate(); + int slot = largestLowerBoundSlotFor(idx, targetTimestamp, IndexSearchType.KEY); + if (slot == -1) + return new TimestampOffset(RecordBatch.NO_TIMESTAMP, baseOffset()); + else + return parseEntry(idx, slot); + }); + } + + /** + * Equivalent to invoking `maybeAppend(timestamp, offset, false)`. + * + * @see #maybeAppend(long, long, boolean) + */ + public void maybeAppend(long timestamp, long offset) { + maybeAppend(timestamp, offset, false); + } + + /** + * Attempt to append a time index entry to the time index. + * The new entry is appended only if both the timestamp and offset are greater than the last appended timestamp and + * the last appended offset. + * + * @param timestamp The timestamp of the new time index entry + * @param offset The offset of the new time index entry + * @param skipFullCheck To skip checking whether the segment is full or not. We only skip the check when the segment + * gets rolled or the segment is closed. + */ + public void maybeAppend(long timestamp, long offset, boolean skipFullCheck) { + lock.lock(); + try { + if (!skipFullCheck && isFull()) + throw new IllegalArgumentException("Attempt to append to a full time index (size = " + entries() + ")."); + + // We do not throw exception when the offset equals to the offset of last entry. That means we are trying + // to insert the same time index entry as the last entry. + // If the timestamp index entry to be inserted is the same as the last entry, we simply ignore the insertion + // because that could happen in the following two scenarios: + // 1. A log segment is closed. + // 2. LogSegment.onBecomeInactiveSegment() is called when an active log segment is rolled. + if (entries() != 0 && offset < lastEntry.offset) + throw new InvalidOffsetException("Attempt to append an offset (" + offset + ") to slot " + entries() + + " no larger than the last offset appended (" + lastEntry.offset + ") to " + file().getAbsolutePath()); + if (entries() != 0 && timestamp < lastEntry.timestamp) + throw new IllegalStateException("Attempt to append a timestamp (" + timestamp + ") to slot " + entries() + + " no larger than the last timestamp appended (" + lastEntry.timestamp + ") to " + file().getAbsolutePath()); + + // We only append to the time index when the timestamp is greater than the last inserted timestamp. + // If all the messages are in message format v0, the timestamp will always be NoTimestamp. In that case, the time + // index will be empty. + if (timestamp > lastEntry.timestamp) { + log.trace("Adding index entry {} => {} to {}.", timestamp, offset, file().getAbsolutePath()); + MappedByteBuffer mmap = mmap(); + mmap.putLong(timestamp); + mmap.putInt(relativeOffset(offset)); + incrementEntries(); + this.lastEntry = new TimestampOffset(timestamp, offset); + if (entries() * ENTRY_SIZE != mmap.position()) + throw new IllegalStateException(entries() + " entries but file position in index is " + mmap.position()); + } + } finally { + lock.unlock(); + } + } + + @Override + public boolean resize(int newSize) throws IOException { + lock.lock(); + try { + if (super.resize(newSize)) { + this.lastEntry = lastEntryFromIndexFile(); + return true; + } else + return false; + } finally { + lock.unlock(); + } + } + + // Visible for testing, we can make this protected once TimeIndexTest is in the same package as this class + @Override + public void truncate() { + truncateToEntries(0); + } + + @Override + protected int entrySize() { + return ENTRY_SIZE; + } + + @Override + protected TimestampOffset parseEntry(ByteBuffer buffer, int n) { + return new TimestampOffset(timestamp(buffer, n), baseOffset() + relativeOffset(buffer, n)); + } + + private long timestamp(ByteBuffer buffer, int n) { + return buffer.getLong(n * ENTRY_SIZE); + } + + private int relativeOffset(ByteBuffer buffer, int n) { + return buffer.getInt(n * ENTRY_SIZE + 8); + } + + /** + * Read the last entry from the index file. This operation involves disk access. + */ + private TimestampOffset lastEntryFromIndexFile() { + lock.lock(); + try { + int entries = entries(); + if (entries == 0) + return new TimestampOffset(RecordBatch.NO_TIMESTAMP, baseOffset()); + else + return parseEntry(mmap(), entries - 1); + } finally { + lock.unlock(); + } + } + + /** + * Truncates index to a known number of entries. + */ + private void truncateToEntries(int entries) { + lock.lock(); + try { + super.truncateToEntries0(entries); + this.lastEntry = lastEntryFromIndexFile(); + log.debug("Truncated index {} to {} entries; position is now {} and last entry is now {}", + file().getAbsolutePath(), entries, mmap().position(), lastEntry); + } finally { + lock.unlock(); + } + } +} From 9294e307bd906f3128364ea021edd71c9354d8c0 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Mon, 19 Dec 2022 12:16:44 -0800 Subject: [PATCH 2/3] Move LazyIndex to storage module --- core/src/main/scala/kafka/log/LazyIndex.scala | 166 ------------ .../src/main/scala/kafka/log/LogSegment.scala | 6 +- .../kafka/log/remote/RemoteIndexCache.scala | 12 +- .../scala/unit/kafka/log/LogTestUtils.scala | 6 +- .../kafka/server/log/internals/LazyIndex.java | 245 ++++++++++++++++++ .../kafka/server/log/internals/TimeIndex.java | 5 +- 6 files changed, 260 insertions(+), 180 deletions(-) delete mode 100644 core/src/main/scala/kafka/log/LazyIndex.scala create mode 100644 storage/src/main/java/org/apache/kafka/server/log/internals/LazyIndex.java diff --git a/core/src/main/scala/kafka/log/LazyIndex.scala b/core/src/main/scala/kafka/log/LazyIndex.scala deleted file mode 100644 index 26afbd9986522..0000000000000 --- a/core/src/main/scala/kafka/log/LazyIndex.scala +++ /dev/null @@ -1,166 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.log - -import java.io.File -import java.nio.file.{Files, NoSuchFileException} -import java.util.concurrent.locks.ReentrantLock -import LazyIndex._ -import kafka.utils.CoreUtils.inLock -import kafka.utils.threadsafe -import org.apache.kafka.common.utils.Utils -import org.apache.kafka.server.log.internals.{AbstractIndex, OffsetIndex, TimeIndex} - -/** - * A wrapper over an `AbstractIndex` instance that provides a mechanism to defer loading - * (i.e. memory mapping) the underlying index until it is accessed for the first time via the - * `get` method. - * - * In addition, this class exposes a number of methods (e.g. updateParentDir, renameTo, close, - * etc.) that provide the desired behavior without causing the index to be loaded. If the index - * had previously been loaded, the methods in this class simply delegate to the relevant method in - * the index. - * - * This is an important optimization with regards to broker start-up and shutdown time if it has a - * large number of segments. - * - * Methods of this class are thread safe. Make sure to check `AbstractIndex` subclasses - * documentation to establish their thread safety. - * - * @param loadIndex A function that takes a `File` pointing to an index and returns a loaded - * `AbstractIndex` instance. - */ -@threadsafe -class LazyIndex[T <: AbstractIndex] private (@volatile private var indexWrapper: IndexWrapper, loadIndex: File => T) { - - private val lock = new ReentrantLock() - - def file: File = indexWrapper.file - - def get: T = { - indexWrapper match { - case indexValue: IndexValue[_] => indexValue.index.asInstanceOf[T] - case _: IndexFile => - inLock(lock) { - indexWrapper match { - case indexValue: IndexValue[_] => indexValue.index.asInstanceOf[T] - case indexFile: IndexFile => - val indexValue = new IndexValue(loadIndex(indexFile.file)) - indexWrapper = indexValue - indexValue.index - } - } - } - } - - def updateParentDir(parentDir: File): Unit = { - inLock(lock) { - indexWrapper.updateParentDir(parentDir) - } - } - - def renameTo(f: File): Unit = { - inLock(lock) { - indexWrapper.renameTo(f) - } - } - - def deleteIfExists(): Boolean = { - inLock(lock) { - indexWrapper.deleteIfExists() - } - } - - def close(): Unit = { - inLock(lock) { - indexWrapper.close() - } - } - - def closeHandler(): Unit = { - inLock(lock) { - indexWrapper.closeHandler() - } - } - -} - -object LazyIndex { - - def forOffset(file: File, baseOffset: Long, maxIndexSize: Int = -1, writable: Boolean = true): LazyIndex[OffsetIndex] = - new LazyIndex(new IndexFile(file), file => new OffsetIndex(file, baseOffset, maxIndexSize, writable)) - - def forTime(file: File, baseOffset: Long, maxIndexSize: Int = -1, writable: Boolean = true): LazyIndex[TimeIndex] = - new LazyIndex(new IndexFile(file), file => new TimeIndex(file, baseOffset, maxIndexSize, writable)) - - private sealed trait IndexWrapper { - - def file: File - - def updateParentDir(f: File): Unit - - def renameTo(f: File): Unit - - def deleteIfExists(): Boolean - - def close(): Unit - - def closeHandler(): Unit - - } - - private class IndexFile(@volatile private var _file: File) extends IndexWrapper { - - def file: File = _file - - def updateParentDir(parentDir: File): Unit = _file = new File(parentDir, file.getName) - - def renameTo(f: File): Unit = { - try Utils.atomicMoveWithFallback(file.toPath, f.toPath, false) - catch { - case _: NoSuchFileException if !file.exists => () - } - finally _file = f - } - - def deleteIfExists(): Boolean = Files.deleteIfExists(file.toPath) - - def close(): Unit = () - - def closeHandler(): Unit = () - - } - - private class IndexValue[T <: AbstractIndex](val index: T) extends IndexWrapper { - - def file: File = index.file - - def updateParentDir(parentDir: File): Unit = index.updateParentDir(parentDir) - - def renameTo(f: File): Unit = index.renameTo(f) - - def deleteIfExists(): Boolean = index.deleteIfExists() - - def close(): Unit = index.close() - - def closeHandler(): Unit = index.closeHandler() - - } - -} - diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index 4ae8c924d9f96..ae9a6cbabc86b 100755 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -32,7 +32,7 @@ import org.apache.kafka.common.errors.CorruptRecordException import org.apache.kafka.common.record.FileRecords.{LogOffsetPosition, TimestampAndOffset} import org.apache.kafka.common.record._ import org.apache.kafka.common.utils.{BufferSupplier, Time} -import org.apache.kafka.server.log.internals.{AbortedTxn, CompletedTxn, OffsetIndex, OffsetPosition, TimeIndex, TimestampOffset, TransactionIndex, TxnIndexSearchResult} +import org.apache.kafka.server.log.internals.{AbortedTxn, CompletedTxn, LazyIndex, OffsetIndex, OffsetPosition, TimeIndex, TimestampOffset, TransactionIndex, TxnIndexSearchResult} import java.util.Optional import scala.jdk.CollectionConverters._ @@ -672,8 +672,8 @@ object LogSegment { val maxIndexSize = config.maxIndexSize new LogSegment( FileRecords.open(UnifiedLog.logFile(dir, baseOffset, fileSuffix), fileAlreadyExists, initFileSize, preallocate), - LazyIndex.forOffset(UnifiedLog.offsetIndexFile(dir, baseOffset, fileSuffix), baseOffset = baseOffset, maxIndexSize = maxIndexSize), - LazyIndex.forTime(UnifiedLog.timeIndexFile(dir, baseOffset, fileSuffix), baseOffset = baseOffset, maxIndexSize = maxIndexSize), + LazyIndex.forOffset(UnifiedLog.offsetIndexFile(dir, baseOffset, fileSuffix), baseOffset, maxIndexSize, true), + LazyIndex.forTime(UnifiedLog.timeIndexFile(dir, baseOffset, fileSuffix), baseOffset, maxIndexSize, true), new TransactionIndex(baseOffset, UnifiedLog.transactionIndexFile(dir, baseOffset, fileSuffix)), baseOffset, indexIntervalBytes = config.indexInterval, diff --git a/core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala b/core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala index e48b833fd4383..e6e69c61fc9fe 100644 --- a/core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala +++ b/core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala @@ -16,13 +16,13 @@ */ package kafka.log.remote -import kafka.log.{LazyIndex, _} +import kafka.log.UnifiedLog import kafka.log.remote.RemoteIndexCache.DirName import kafka.utils.{CoreUtils, Logging, ShutdownableThread} import org.apache.kafka.common.Uuid import org.apache.kafka.common.errors.CorruptRecordException import org.apache.kafka.common.utils.Utils -import org.apache.kafka.server.log.internals.{OffsetIndex, OffsetPosition, TimeIndex, TransactionIndex} +import org.apache.kafka.server.log.internals.{LazyIndex, OffsetIndex, OffsetPosition, TimeIndex, TransactionIndex} import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType import org.apache.kafka.server.log.remote.storage.{RemoteLogSegmentMetadata, RemoteStorageManager} @@ -144,13 +144,13 @@ class RemoteIndexCache(maxSize: Int = 1024, remoteStorageManager: RemoteStorageM if (offsetIndexFile.exists() && timestampIndexFile.exists() && txnIndexFile.exists()) { val offsetIndex: LazyIndex[OffsetIndex] = { - val index = LazyIndex.forOffset(offsetIndexFile, offset, Int.MaxValue, writable = false) + val index = LazyIndex.forOffset(offsetIndexFile, offset, Int.MaxValue, false) index.get.sanityCheck() index } val timeIndex: LazyIndex[TimeIndex] = { - val index = LazyIndex.forTime(timestampIndexFile, offset, Int.MaxValue, writable = false) + val index = LazyIndex.forTime(timestampIndexFile, offset, Int.MaxValue, false) index.get.sanityCheck() index } @@ -241,7 +241,7 @@ class RemoteIndexCache(maxSize: Int = 1024, remoteStorageManager: RemoteStorageM val offsetIndex: LazyIndex[OffsetIndex] = loadIndexFile(fileName, UnifiedLog.IndexFileSuffix, rlsMetadata => remoteStorageManager.fetchIndex(rlsMetadata, IndexType.OFFSET), file => { - val index = LazyIndex.forOffset(file, startOffset, Int.MaxValue, writable = false) + val index = LazyIndex.forOffset(file, startOffset, Int.MaxValue, false) index.get.sanityCheck() index }) @@ -249,7 +249,7 @@ class RemoteIndexCache(maxSize: Int = 1024, remoteStorageManager: RemoteStorageM val timeIndex: LazyIndex[TimeIndex] = loadIndexFile(fileName, UnifiedLog.TimeIndexFileSuffix, rlsMetadata => remoteStorageManager.fetchIndex(rlsMetadata, IndexType.TIMESTAMP), file => { - val index = LazyIndex.forTime(file, startOffset, Int.MaxValue, writable = false) + val index = LazyIndex.forTime(file, startOffset, Int.MaxValue, false) index.get.sanityCheck() index }) diff --git a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala index a293ed9eac928..b4ff5a090c4f8 100644 --- a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala +++ b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala @@ -32,7 +32,7 @@ import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse} import java.nio.file.Files import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap} import kafka.log -import org.apache.kafka.server.log.internals.{AbortedTxn, TransactionIndex} +import org.apache.kafka.server.log.internals.{AbortedTxn, LazyIndex, TransactionIndex} import scala.collection.Iterable import scala.jdk.CollectionConverters._ @@ -46,8 +46,8 @@ object LogTestUtils { indexIntervalBytes: Int = 10, time: Time = Time.SYSTEM): LogSegment = { val ms = FileRecords.open(UnifiedLog.logFile(logDir, offset)) - val idx = LazyIndex.forOffset(UnifiedLog.offsetIndexFile(logDir, offset), offset, maxIndexSize = 1000) - val timeIdx = LazyIndex.forTime(UnifiedLog.timeIndexFile(logDir, offset), offset, maxIndexSize = 1500) + val idx = LazyIndex.forOffset(UnifiedLog.offsetIndexFile(logDir, offset), offset, 1000, true) + val timeIdx = LazyIndex.forTime(UnifiedLog.timeIndexFile(logDir, offset), offset, 1500, true) val txnIndex = new TransactionIndex(offset, UnifiedLog.transactionIndexFile(logDir, offset)) new LogSegment(ms, idx, timeIdx, txnIndex, offset, indexIntervalBytes, 0, time) diff --git a/storage/src/main/java/org/apache/kafka/server/log/internals/LazyIndex.java b/storage/src/main/java/org/apache/kafka/server/log/internals/LazyIndex.java new file mode 100644 index 0000000000000..220ab6464dcc4 --- /dev/null +++ b/storage/src/main/java/org/apache/kafka/server/log/internals/LazyIndex.java @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import org.apache.kafka.common.utils.Utils; + +/** + * A wrapper over an `AbstractIndex` instance that provides a mechanism to defer loading + * (i.e. memory mapping) the underlying index until it is accessed for the first time via the + * `get` method. + * + * In addition, this class exposes a number of methods (e.g. updateParentDir, renameTo, close, + * etc.) that provide the desired behavior without causing the index to be loaded. If the index + * had previously been loaded, the methods in this class simply delegate to the relevant method in + * the index. + * + * This is an important optimization with regards to broker start-up and shutdown time if it has a + * large number of segments. + * + * Methods of this class are thread safe. Make sure to check `AbstractIndex` subclasses + * documentation to establish their thread safety. + */ +public class LazyIndex { + + private enum IndexType { + OFFSET, TIME; + } + + private interface IndexWrapper extends Closeable { + File file(); + void updateParentDir(File file); + void renameTo(File file) throws IOException; + boolean deleteIfExists() throws IOException; + void close() throws IOException; + void closeHandler(); + } + + private static class IndexFile implements IndexWrapper { + + private volatile File file; + + IndexFile(File file) { + this.file = file; + } + + @Override + public File file() { + return file; + } + + @Override + public void updateParentDir(File parentDir) { + file = new File(parentDir, file.getName()); + } + + @Override + public void renameTo(File f) throws IOException { + try { + Utils.atomicMoveWithFallback(file.toPath(), f.toPath(), false); + } catch (NoSuchFileException e) { + if (file.exists()) + throw e; + } finally { + file = f; + } + } + + @Override + public boolean deleteIfExists() throws IOException { + return Files.deleteIfExists(file.toPath()); + } + + @Override + public void close() { } + + @Override + public void closeHandler() { } + + } + + private static class IndexValue implements IndexWrapper { + + private final T index; + + IndexValue(T index) { + this.index = index; + } + + @Override + public File file() { + return index.file(); + } + + @Override + public void updateParentDir(File parentDir) { + index.updateParentDir(parentDir); + } + + @Override + public void renameTo(File f) throws IOException { + index.renameTo(f); + } + + @Override + public boolean deleteIfExists() throws IOException { + return index.deleteIfExists(); + } + + @Override + public void close() throws IOException { + index.close(); + } + + @Override + public void closeHandler() { + index.closeHandler(); + } + } + + private final Lock lock = new ReentrantLock(); + private final long baseOffset; + private final int maxIndexSize; + private final boolean writable; + private final IndexType indexType; + + private volatile IndexWrapper indexWrapper; + + private LazyIndex(IndexWrapper indexWrapper, long baseOffset, int maxIndexSize, boolean writable, + IndexType indexType) { + this.indexWrapper = indexWrapper; + this.baseOffset = baseOffset; + this.maxIndexSize = maxIndexSize; + this.writable = writable; + this.indexType = indexType; + } + + public static LazyIndex forOffset(File file, long baseOffset, int maxIndexSize, boolean writable) { + return new LazyIndex<>(new IndexFile(file), baseOffset, maxIndexSize, writable, IndexType.OFFSET); + } + + public static LazyIndex forTime(File file, long baseOffset, int maxIndexSize, boolean writable) { + return new LazyIndex<>(new IndexFile(file), baseOffset, maxIndexSize, writable, IndexType.TIME); + } + + public File file() { + return indexWrapper.file(); + } + + @SuppressWarnings("unchecked") + public T get() throws IOException { + if (indexWrapper instanceof IndexValue) + return ((IndexValue) indexWrapper).index; + else if (indexWrapper instanceof IndexFile) { + lock.lock(); + try { + IndexFile indexFile = (IndexFile) indexWrapper; + IndexValue indexValue = new IndexValue<>(loadIndex(indexFile.file)); + indexWrapper = indexValue; + return indexValue.index; + } finally { + lock.unlock(); + } + } else + throw new IllegalStateException("Unexpected type for indexWrapper " + indexWrapper.getClass()); + } + + public void updateParentDir(File parentDir) { + lock.lock(); + try { + indexWrapper.updateParentDir(parentDir); + } finally { + lock.unlock(); + } + } + + public void renameTo(File f) throws IOException { + lock.lock(); + try { + indexWrapper.renameTo(f); + } finally { + lock.unlock(); + } + } + + public boolean deleteIfExists() throws IOException { + lock.lock(); + try { + return indexWrapper.deleteIfExists(); + } finally { + lock.unlock(); + } + } + + public void close() throws IOException { + lock.lock(); + try { + indexWrapper.close(); + } finally { + lock.unlock(); + } + } + + public void closeHandler() { + lock.lock(); + try { + indexWrapper.closeHandler(); + } finally { + lock.unlock(); + } + } + + @SuppressWarnings("unchecked") + private T loadIndex(File file) throws IOException { + switch (indexType) { + case OFFSET: + return (T) new OffsetIndex(file, baseOffset, maxIndexSize, writable); + case TIME: + return (T) new TimeIndex(file, baseOffset, maxIndexSize, writable); + default: + throw new IllegalStateException("Unexpected indexType " + indexType); + } + } + +} diff --git a/storage/src/main/java/org/apache/kafka/server/log/internals/TimeIndex.java b/storage/src/main/java/org/apache/kafka/server/log/internals/TimeIndex.java index 30126439b983d..cdbe16ec6c43c 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/internals/TimeIndex.java +++ b/storage/src/main/java/org/apache/kafka/server/log/internals/TimeIndex.java @@ -75,8 +75,9 @@ public TimeIndex(File file, long baseOffset, int maxIndexSize, boolean writable) @Override public void sanityCheck() { - long lastTimestamp = lastEntry.timestamp; - long lastOffset = lastEntry.offset; + TimestampOffset entry = lastEntry(); + long lastTimestamp = entry.timestamp; + long lastOffset = entry.offset; if (entries() != 0 && lastTimestamp < timestamp(mmap(), 0)) throw new CorruptIndexException("Corrupt time index found, time index file (" + file().getAbsolutePath() + ") has " + "non-zero size but the last timestamp is " + lastTimestamp + " which is less than the first timestamp " From 4ae7ae9da48a5de30b9e5abd3edeb80ea29d644f Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Tue, 20 Dec 2022 15:48:17 -0800 Subject: [PATCH 3/3] Remove unused TimeIndex constructor --- .../java/org/apache/kafka/server/log/internals/TimeIndex.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/storage/src/main/java/org/apache/kafka/server/log/internals/TimeIndex.java b/storage/src/main/java/org/apache/kafka/server/log/internals/TimeIndex.java index cdbe16ec6c43c..8c7c6d8d58c2a 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/internals/TimeIndex.java +++ b/storage/src/main/java/org/apache/kafka/server/log/internals/TimeIndex.java @@ -56,10 +56,6 @@ public class TimeIndex extends AbstractIndex { private volatile TimestampOffset lastEntry; - public TimeIndex(File file, long baseOffset) throws IOException { - this(file, baseOffset, -1); - } - public TimeIndex(File file, long baseOffset, int maxIndexSize) throws IOException { this(file, baseOffset, maxIndexSize, true); }