From 303355205194f0ff80e8f8f6131f9d00c0fef175 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Sun, 11 Dec 2022 15:40:22 -0800 Subject: [PATCH 1/2] Move AbstractIndex and related to storage module --- .../main/scala/kafka/log/AbstractIndex.scala | 440 -------------- core/src/main/scala/kafka/log/LazyIndex.scala | 2 +- .../main/scala/kafka/log/OffsetIndex.scala | 35 +- core/src/main/scala/kafka/log/TimeIndex.scala | 39 +- gradle/spotbugs-exclude.xml | 7 + .../server/log/internals/AbstractIndex.java | 553 ++++++++++++++++++ .../IndexOffsetOverflowException.java | 20 +- .../server/log/internals/IndexSearchType.java | 21 + 8 files changed, 632 insertions(+), 485 deletions(-) delete mode 100644 core/src/main/scala/kafka/log/AbstractIndex.scala create mode 100644 storage/src/main/java/org/apache/kafka/server/log/internals/AbstractIndex.java rename core/src/main/scala/kafka/common/IndexOffsetOverflowException.scala => storage/src/main/java/org/apache/kafka/server/log/internals/IndexOffsetOverflowException.java (61%) create mode 100644 storage/src/main/java/org/apache/kafka/server/log/internals/IndexSearchType.java diff --git a/core/src/main/scala/kafka/log/AbstractIndex.scala b/core/src/main/scala/kafka/log/AbstractIndex.scala deleted file mode 100644 index 1c8032115d890..0000000000000 --- a/core/src/main/scala/kafka/log/AbstractIndex.scala +++ /dev/null @@ -1,440 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.log - -import java.io.{Closeable, File, RandomAccessFile} -import java.nio.channels.FileChannel -import java.nio.file.Files -import java.nio.{ByteBuffer, MappedByteBuffer} -import java.util.concurrent.locks.{Lock, ReentrantLock} -import kafka.common.IndexOffsetOverflowException -import kafka.utils.CoreUtils.inLock -import kafka.utils.{CoreUtils, Logging} -import org.apache.kafka.common.utils.{ByteBufferUnmapper, OperatingSystem, Utils} -import org.apache.kafka.server.log.internals.IndexEntry - -/** - * The abstract index class which holds entry format agnostic methods. - * - * @param _file The index file - * @param baseOffset the base offset of the segment that this index is corresponding to. - * @param maxIndexSize The maximum index size in bytes. - */ -abstract class AbstractIndex(@volatile private var _file: File, val baseOffset: Long, val maxIndexSize: Int = -1, - val writable: Boolean) extends Closeable { - import AbstractIndex._ - - // Length of the index file - @volatile - private var _length: Long = _ - protected def entrySize: Int - - /* - Kafka mmaps index files into memory, and all the read / write operations of the index is through OS page cache. This - avoids blocked disk I/O in most cases. - - To the extent of our knowledge, all the modern operating systems use LRU policy or its variants to manage page - cache. Kafka always appends to the end of the index file, and almost all the index lookups (typically from in-sync - followers or consumers) are very close to the end of the index. So, the LRU cache replacement policy should work very - well with Kafka's index access pattern. - - However, when looking up index, the standard binary search algorithm is not cache friendly, and can cause unnecessary - page faults (the thread is blocked to wait for reading some index entries from hard disk, as those entries are not - cached in the page cache). - - For example, in an index with 13 pages, to lookup an entry in the last page (page #12), the standard binary search - algorithm will read index entries in page #0, 6, 9, 11, and 12. - page number: |0|1|2|3|4|5|6|7|8|9|10|11|12 | - steps: |1| | | | | |3| | |4| |5 |2/6| - In each page, there are hundreds log entries, corresponding to hundreds to thousands of kafka messages. When the - index gradually growing from the 1st entry in page #12 to the last entry in page #12, all the write (append) - operations are in page #12, and all the in-sync follower / consumer lookups read page #0,6,9,11,12. As these pages - are always used in each in-sync lookup, we can assume these pages are fairly recently used, and are very likely to be - in the page cache. When the index grows to page #13, the pages needed in a in-sync lookup change to #0, 7, 10, 12, - and 13: - page number: |0|1|2|3|4|5|6|7|8|9|10|11|12|13 | - steps: |1| | | | | | |3| | | 4|5 | 6|2/7| - Page #7 and page #10 have not been used for a very long time. They are much less likely to be in the page cache, than - the other pages. The 1st lookup, after the 1st index entry in page #13 is appended, is likely to have to read page #7 - and page #10 from disk (page fault), which can take up to more than a second. In our test, this can cause the - at-least-once produce latency to jump to about 1 second from a few ms. - - Here, we use a more cache-friendly lookup algorithm: - if (target > indexEntry[end - N]) // if the target is in the last N entries of the index - binarySearch(end - N, end) - else - binarySearch(begin, end - N) - - If possible, we only look up in the last N entries of the index. By choosing a proper constant N, all the in-sync - lookups should go to the 1st branch. We call the last N entries the "warm" section. As we frequently look up in this - relatively small section, the pages containing this section are more likely to be in the page cache. - - We set N (_warmEntries) to 8192, because - 1. This number is small enough to guarantee all the pages of the "warm" section is touched in every warm-section - lookup. So that, the entire warm section is really "warm". - When doing warm-section lookup, following 3 entries are always touched: indexEntry(end), indexEntry(end-N), - and indexEntry((end*2 -N)/2). If page size >= 4096, all the warm-section pages (3 or fewer) are touched, when we - touch those 3 entries. As of 2018, 4096 is the smallest page size for all the processors (x86-32, x86-64, MIPS, - SPARC, Power, ARM etc.). - 2. This number is large enough to guarantee most of the in-sync lookups are in the warm-section. With default Kafka - settings, 8KB index corresponds to about 4MB (offset index) or 2.7MB (time index) log messages. - - We can't set make N (_warmEntries) to be larger than 8192, as there is no simple way to guarantee all the "warm" - section pages are really warm (touched in every lookup) on a typical 4KB-page host. - - In there future, we may use a backend thread to periodically touch the entire warm section. So that, we can - 1) support larger warm section - 2) make sure the warm section of low QPS topic-partitions are really warm. - */ - protected def _warmEntries: Int = 8192 / entrySize - - protected val lock = new ReentrantLock - - @volatile - protected var mmap: MappedByteBuffer = { - val newlyCreated = file.createNewFile() - val raf = if (writable) new RandomAccessFile(file, "rw") else new RandomAccessFile(file, "r") - try { - /* pre-allocate the file if necessary */ - if(newlyCreated) { - if(maxIndexSize < entrySize) - throw new IllegalArgumentException("Invalid max index size: " + maxIndexSize) - raf.setLength(roundDownToExactMultiple(maxIndexSize, entrySize)) - } - - /* memory-map the file */ - _length = raf.length() - val idx = { - if (writable) - raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, _length) - else - raf.getChannel.map(FileChannel.MapMode.READ_ONLY, 0, _length) - } - /* set the position in the index for the next entry */ - if(newlyCreated) - idx.position(0) - else - // if this is a pre-existing index, assume it is valid and set position to last entry - idx.position(roundDownToExactMultiple(idx.limit(), entrySize)) - idx - } finally { - CoreUtils.swallow(raf.close(), AbstractIndex) - } - } - - /** - * The maximum number of entries this index can hold - */ - @volatile - private[this] var _maxEntries: Int = mmap.limit() / entrySize - - /** The number of entries in this index */ - @volatile - protected var _entries: Int = mmap.position() / entrySize - - /** - * True iff there are no more slots available in this index - */ - def isFull: Boolean = _entries >= _maxEntries - - def file: File = _file - - def maxEntries: Int = _maxEntries - - def entries: Int = _entries - - def length: Long = _length - - def updateParentDir(parentDir: File): Unit = _file = new File(parentDir, file.getName) - - /** - * Reset the size of the memory map and the underneath file. This is used in two kinds of cases: (1) in - * trimToValidSize() which is called at closing the segment or new segment being rolled; (2) at - * loading segments from disk or truncating back to an old segment where a new log segment became active; - * we want to reset the index size to maximum index size to avoid rolling new segment. - * - * @param newSize new size of the index file - * @return a boolean indicating whether the size of the memory map and the underneath file is changed or not. - */ - def resize(newSize: Int): Boolean = { - inLock(lock) { - val roundedNewSize = roundDownToExactMultiple(newSize, entrySize) - - if (_length == roundedNewSize) { - debug(s"Index ${file.getAbsolutePath} was not resized because it already has size $roundedNewSize") - false - } else { - val raf = new RandomAccessFile(file, "rw") - try { - val position = mmap.position() - - /* Windows or z/OS won't let us modify the file length while the file is mmapped :-( */ - if (OperatingSystem.IS_WINDOWS || OperatingSystem.IS_ZOS) - safeForceUnmap() - raf.setLength(roundedNewSize) - _length = roundedNewSize - mmap = raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize) - _maxEntries = mmap.limit() / entrySize - mmap.position(position) - debug(s"Resized ${file.getAbsolutePath} to $roundedNewSize, position is ${mmap.position()} " + - s"and limit is ${mmap.limit()}") - true - } finally { - CoreUtils.swallow(raf.close(), AbstractIndex) - } - } - } - } - - /** - * Rename the file that backs this offset index - * - * @throws IOException if rename fails - */ - def renameTo(f: File): Unit = { - try Utils.atomicMoveWithFallback(file.toPath, f.toPath, false) - finally _file = f - } - - /** - * Flush the data in the index to disk - */ - def flush(): Unit = { - inLock(lock) { - mmap.force() - } - } - - /** - * Delete this index file. - * - * @throws IOException if deletion fails due to an I/O error - * @return `true` if the file was deleted by this method; `false` if the file could not be deleted because it did - * not exist - */ - def deleteIfExists(): Boolean = { - closeHandler() - Files.deleteIfExists(file.toPath) - } - - /** - * Trim this segment to fit just the valid entries, deleting all trailing unwritten bytes from - * the file. - */ - def trimToValidSize(): Unit = { - inLock(lock) { - resize(entrySize * _entries) - } - } - - /** - * The number of bytes actually used by this index - */ - def sizeInBytes: Int = entrySize * _entries - - /** Close the index */ - def close(): Unit = { - trimToValidSize() - closeHandler() - } - - def closeHandler(): Unit = { - // On JVM, a memory mapping is typically unmapped by garbage collector. - // However, in some cases it can pause application threads(STW) for a long moment reading metadata from a physical disk. - // To prevent this, we forcefully cleanup memory mapping within proper execution which never affects API responsiveness. - // See https://issues.apache.org/jira/browse/KAFKA-4614 for the details. - inLock(lock) { - safeForceUnmap() - } - } - - /** - * Do a basic sanity check on this index to detect obvious problems - * - * @throws CorruptIndexException if any problems are found - */ - def sanityCheck(): Unit - - /** - * Remove all the entries from the index. - */ - protected def truncate(): Unit - - /** - * Remove all entries from the index which have an offset greater than or equal to the given offset. - * Truncating to an offset larger than the largest in the index has no effect. - */ - def truncateTo(offset: Long): Unit - - /** - * Remove all the entries from the index and resize the index to the max index size. - */ - def reset(): Unit = { - truncate() - resize(maxIndexSize) - } - - /** - * Get offset relative to base offset of this index - * @throws IndexOffsetOverflowException - */ - def relativeOffset(offset: Long): Int = { - val relativeOffset = toRelative(offset) - if (relativeOffset.isEmpty) - throw new IndexOffsetOverflowException(s"Integer overflow for offset: $offset (${file.getAbsoluteFile})") - relativeOffset.get - } - - /** - * Check if a particular offset is valid to be appended to this index. - * @param offset The offset to check - * @return true if this offset is valid to be appended to this index; false otherwise - */ - def canAppendOffset(offset: Long): Boolean = { - toRelative(offset).isDefined - } - - protected def safeForceUnmap(): Unit = { - if (mmap != null) { - try forceUnmap() - catch { - case t: Throwable => error(s"Error unmapping index $file", t) - } - } - } - - /** - * Forcefully free the buffer's mmap. - */ - protected[log] def forceUnmap(): Unit = { - try ByteBufferUnmapper.unmap(file.getAbsolutePath, mmap) - finally mmap = null // Accessing unmapped mmap crashes JVM by SEGV so we null it out to be safe - } - - /** - * Execute the given function in a lock only if we are running on windows or z/OS. We do this - * because Windows or z/OS won't let us resize a file while it is mmapped. As a result we have to force unmap it - * and this requires synchronizing reads. - */ - protected def maybeLock[T](lock: Lock)(fun: => T): T = { - if (OperatingSystem.IS_WINDOWS || OperatingSystem.IS_ZOS) - lock.lock() - try fun - finally { - if (OperatingSystem.IS_WINDOWS || OperatingSystem.IS_ZOS) - lock.unlock() - } - } - - /** - * To parse an entry in the index. - * - * @param buffer the buffer of this memory mapped index. - * @param n the slot - * @return the index entry stored in the given slot. - */ - protected def parseEntry(buffer: ByteBuffer, n: Int): IndexEntry - - /** - * Find the slot in which the largest entry less than or equal to the given target key or value is stored. - * The comparison is made using the `IndexEntry.compareTo()` method. - * - * @param idx The index buffer - * @param target The index key to look for - * @return The slot found or -1 if the least entry in the index is larger than the target key or the index is empty - */ - protected def largestLowerBoundSlotFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchType): Int = - indexSlotRangeFor(idx, target, searchEntity)._1 - - /** - * Find the smallest entry greater than or equal the target key or value. If none can be found, -1 is returned. - */ - protected def smallestUpperBoundSlotFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchType): Int = - indexSlotRangeFor(idx, target, searchEntity)._2 - - /** - * Lookup lower and upper bounds for the given target. - */ - private def indexSlotRangeFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchType): (Int, Int) = { - // check if the index is empty - if(_entries == 0) - return (-1, -1) - - def binarySearch(begin: Int, end: Int) : (Int, Int) = { - // binary search for the entry - var lo = begin - var hi = end - while(lo < hi) { - val mid = (lo + hi + 1) >>> 1 - val found = parseEntry(idx, mid) - val compareResult = compareIndexEntry(found, target, searchEntity) - if(compareResult > 0) - hi = mid - 1 - else if(compareResult < 0) - lo = mid - else - return (mid, mid) - } - (lo, if (lo == _entries - 1) -1 else lo + 1) - } - - val firstHotEntry = Math.max(0, _entries - 1 - _warmEntries) - // check if the target offset is in the warm section of the index - if(compareIndexEntry(parseEntry(idx, firstHotEntry), target, searchEntity) < 0) { - return binarySearch(firstHotEntry, _entries - 1) - } - - // check if the target offset is smaller than the least offset - if(compareIndexEntry(parseEntry(idx, 0), target, searchEntity) > 0) - return (-1, 0) - - binarySearch(0, firstHotEntry) - } - - private def compareIndexEntry(indexEntry: IndexEntry, target: Long, searchEntity: IndexSearchType): Int = { - searchEntity match { - case IndexSearchType.KEY => java.lang.Long.compare(indexEntry.indexKey, target) - case IndexSearchType.VALUE => java.lang.Long.compare(indexEntry.indexValue, target) - } - } - - /** - * Round a number to the greatest exact multiple of the given factor less than the given number. - * E.g. roundDownToExactMultiple(67, 8) == 64 - */ - private def roundDownToExactMultiple(number: Int, factor: Int) = factor * (number / factor) - - private def toRelative(offset: Long): Option[Int] = { - val relativeOffset = offset - baseOffset - if (relativeOffset < 0 || relativeOffset > Int.MaxValue) - None - else - Some(relativeOffset.toInt) - } - -} - -object AbstractIndex extends Logging { - override val loggerName: String = classOf[AbstractIndex].getName -} - -sealed trait IndexSearchType -object IndexSearchType { - case object KEY extends IndexSearchType - case object VALUE extends IndexSearchType -} diff --git a/core/src/main/scala/kafka/log/LazyIndex.scala b/core/src/main/scala/kafka/log/LazyIndex.scala index 0aadb2a4c3819..d267883541203 100644 --- a/core/src/main/scala/kafka/log/LazyIndex.scala +++ b/core/src/main/scala/kafka/log/LazyIndex.scala @@ -20,11 +20,11 @@ package kafka.log import java.io.File import java.nio.file.{Files, NoSuchFileException} import java.util.concurrent.locks.ReentrantLock - import LazyIndex._ import kafka.utils.CoreUtils.inLock import kafka.utils.threadsafe import org.apache.kafka.common.utils.Utils +import org.apache.kafka.server.log.internals.AbstractIndex /** * A wrapper over an `AbstractIndex` instance that provides a mechanism to defer loading diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala index 3b3ea461fbddf..9039f1d4e541d 100755 --- a/core/src/main/scala/kafka/log/OffsetIndex.scala +++ b/core/src/main/scala/kafka/log/OffsetIndex.scala @@ -22,7 +22,7 @@ import java.nio.ByteBuffer import kafka.utils.CoreUtils.inLock import kafka.utils.Logging import org.apache.kafka.common.errors.InvalidOffsetException -import org.apache.kafka.server.log.internals.{CorruptIndexException, OffsetPosition} +import org.apache.kafka.server.log.internals.{AbstractIndex, CorruptIndexException, IndexSearchType, OffsetPosition} /** * An index that maps offsets to physical file locations for a particular log segment. This index may be sparse: @@ -60,14 +60,14 @@ class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writabl private[this] var _lastOffset = lastEntry.offset debug(s"Loaded index file ${file.getAbsolutePath} with maxEntries = $maxEntries, " + - s"maxIndexSize = $maxIndexSize, entries = ${_entries}, lastOffset = ${_lastOffset}, file position = ${mmap.position()}") + s"maxIndexSize = $maxIndexSize, entries = $entries, lastOffset = ${_lastOffset}, file position = ${mmap.position()}") /** * The last entry in the index */ private def lastEntry: OffsetPosition = { inLock(lock) { - _entries match { + entries match { case 0 => new OffsetPosition(baseOffset, 0) case s => parseEntry(mmap, s - 1) } @@ -86,14 +86,14 @@ class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writabl * the pair (baseOffset, 0) is returned. */ def lookup(targetOffset: Long): OffsetPosition = { - maybeLock(lock) { + maybeLock(lock, {() => val idx = mmap.duplicate val slot = largestLowerBoundSlotFor(idx, targetOffset, IndexSearchType.KEY) if(slot == -1) new OffsetPosition(baseOffset, 0) else parseEntry(idx, slot) - } + }) } /** @@ -102,14 +102,14 @@ class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writabl * such offset. */ def fetchUpperBoundOffset(fetchOffset: OffsetPosition, fetchSize: Int): Option[OffsetPosition] = { - maybeLock(lock) { + maybeLock(lock, { () => val idx = mmap.duplicate val slot = smallestUpperBoundSlotFor(idx, fetchOffset.position + fetchSize, IndexSearchType.VALUE) if (slot == -1) None else Some(parseEntry(idx, slot)) - } + }) } private def relativeOffset(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * entrySize) @@ -126,12 +126,12 @@ class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writabl * @return The offset/position pair at that entry */ def entry(n: Int): OffsetPosition = { - maybeLock(lock) { - if (n >= _entries) + maybeLock(lock, { () => + if (n >= entries) throw new IllegalArgumentException(s"Attempt to fetch the ${n}th entry from index ${file.getAbsolutePath}, " + - s"which has size ${_entries}.") + s"which has size $entries.") parseEntry(mmap, n) - } + }) } /** @@ -141,14 +141,14 @@ class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writabl */ def append(offset: Long, position: Int): Unit = { inLock(lock) { - require(!isFull, "Attempt to append to a full index (size = " + _entries + ").") - if (_entries == 0 || offset > _lastOffset) { + require(!isFull, "Attempt to append to a full index (size = " + entries + ").") + if (entries == 0 || offset > _lastOffset) { trace(s"Adding index entry $offset => $position to ${file.getAbsolutePath}") mmap.putInt(relativeOffset(offset)) mmap.putInt(position) - _entries += 1 + incrementEntries() _lastOffset = offset - require(_entries * entrySize == mmap.position(), s"$entries entries but file position in index is ${mmap.position()}.") + require(entries * entrySize == mmap.position(), s"$entries entries but file position in index is ${mmap.position()}.") } else { throw new InvalidOffsetException(s"Attempt to append an offset ($offset) to position $entries no larger than" + s" the last offset appended (${_lastOffset}) to ${file.getAbsolutePath}.") @@ -184,8 +184,7 @@ class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writabl */ private def truncateToEntries(entries: Int): Unit = { inLock(lock) { - _entries = entries - mmap.position(_entries * entrySize) + super.truncateToEntries0(entries) _lastOffset = lastEntry.offset debug(s"Truncated index ${file.getAbsolutePath} to $entries entries;" + s" position is now ${mmap.position()} and last offset is now ${_lastOffset}") @@ -193,7 +192,7 @@ class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writabl } override def sanityCheck(): Unit = { - if (_entries != 0 && _lastOffset < baseOffset) + if (entries != 0 && _lastOffset < baseOffset) throw new CorruptIndexException(s"Corrupt index found, index file (${file.getAbsolutePath}) has non-zero size " + s"but the last offset is ${_lastOffset} which is less than the base offset $baseOffset.") if (length % entrySize != 0) diff --git a/core/src/main/scala/kafka/log/TimeIndex.scala b/core/src/main/scala/kafka/log/TimeIndex.scala index 6b22e2f3eafa8..f5ffc7f6c5291 100644 --- a/core/src/main/scala/kafka/log/TimeIndex.scala +++ b/core/src/main/scala/kafka/log/TimeIndex.scala @@ -23,7 +23,7 @@ import kafka.utils.CoreUtils.inLock import kafka.utils.Logging import org.apache.kafka.common.errors.InvalidOffsetException import org.apache.kafka.common.record.RecordBatch -import org.apache.kafka.server.log.internals.{CorruptIndexException, TimestampOffset} +import org.apache.kafka.server.log.internals.{AbstractIndex, CorruptIndexException, IndexSearchType, TimestampOffset} /** * An index that maps from the timestamp to the logical offsets of the messages in a segment. This index might be @@ -59,7 +59,7 @@ class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable: override def entrySize = 12 debug(s"Loaded index file ${file.getAbsolutePath} with maxEntries = $maxEntries, maxIndexSize = $maxIndexSize," + - s" entries = ${_entries}, lastOffset = ${_lastEntry}, file position = ${mmap.position()}") + s" entries = $entries, lastOffset = ${_lastEntry}, file position = ${mmap.position()}") // We override the full check to reserve the last time index entry slot for the on roll call. override def isFull: Boolean = entries >= maxEntries - 1 @@ -75,7 +75,7 @@ class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable: */ private def lastEntryFromIndexFile: TimestampOffset = { inLock(lock) { - _entries match { + entries match { case 0 => new TimestampOffset(RecordBatch.NO_TIMESTAMP, baseOffset) case s => parseEntry(mmap, s - 1) } @@ -88,12 +88,12 @@ class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable: * @return The timestamp/offset pair at that entry */ def entry(n: Int): TimestampOffset = { - maybeLock(lock) { - if(n >= _entries) + maybeLock(lock, { () => + if(n >= entries) throw new IllegalArgumentException(s"Attempt to fetch the ${n}th entry from time index ${file.getAbsolutePath} " + - s"which has size ${_entries}.") + s"which has size $entries.") parseEntry(mmap, n) - } + }) } override def parseEntry(buffer: ByteBuffer, n: Int): TimestampOffset = { @@ -113,18 +113,18 @@ class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable: def maybeAppend(timestamp: Long, offset: Long, skipFullCheck: Boolean = false): Unit = { inLock(lock) { if (!skipFullCheck) - require(!isFull, "Attempt to append to a full time index (size = " + _entries + ").") + require(!isFull, "Attempt to append to a full time index (size = " + entries + ").") // We do not throw exception when the offset equals to the offset of last entry. That means we are trying // to insert the same time index entry as the last entry. // If the timestamp index entry to be inserted is the same as the last entry, we simply ignore the insertion // because that could happen in the following two scenarios: // 1. A log segment is closed. // 2. LogSegment.onBecomeInactiveSegment() is called when an active log segment is rolled. - if (_entries != 0 && offset < lastEntry.offset) - throw new InvalidOffsetException(s"Attempt to append an offset ($offset) to slot ${_entries} no larger than" + + if (entries != 0 && offset < lastEntry.offset) + throw new InvalidOffsetException(s"Attempt to append an offset ($offset) to slot $entries no larger than" + s" the last offset appended (${lastEntry.offset}) to ${file.getAbsolutePath}.") - if (_entries != 0 && timestamp < lastEntry.timestamp) - throw new IllegalStateException(s"Attempt to append a timestamp ($timestamp) to slot ${_entries} no larger" + + if (entries != 0 && timestamp < lastEntry.timestamp) + throw new IllegalStateException(s"Attempt to append a timestamp ($timestamp) to slot $entries no larger" + s" than the last timestamp appended (${lastEntry.timestamp}) to ${file.getAbsolutePath}.") // We only append to the time index when the timestamp is greater than the last inserted timestamp. // If all the messages are in message format v0, the timestamp will always be NoTimestamp. In that case, the time @@ -133,9 +133,9 @@ class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable: trace(s"Adding index entry $timestamp => $offset to ${file.getAbsolutePath}.") mmap.putLong(timestamp) mmap.putInt(relativeOffset(offset)) - _entries += 1 + incrementEntries() _lastEntry = new TimestampOffset(timestamp, offset) - require(_entries * entrySize == mmap.position(), s"${_entries} entries but file position in index is ${mmap.position()}.") + require(entries * entrySize == mmap.position(), s"$entries entries but file position in index is ${mmap.position()}.") } } } @@ -149,14 +149,14 @@ class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable: * @return The time index entry found. */ def lookup(targetTimestamp: Long): TimestampOffset = { - maybeLock(lock) { + maybeLock(lock, {() => val idx = mmap.duplicate val slot = largestLowerBoundSlotFor(idx, targetTimestamp, IndexSearchType.KEY) if (slot == -1) new TimestampOffset(RecordBatch.NO_TIMESTAMP, baseOffset) else parseEntry(idx, slot) - } + }) } override def truncate(): Unit = truncateToEntries(0) @@ -201,8 +201,7 @@ class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable: */ private def truncateToEntries(entries: Int): Unit = { inLock(lock) { - _entries = entries - mmap.position(_entries * entrySize) + super.truncateToEntries0(entries) _lastEntry = lastEntryFromIndexFile debug(s"Truncated index ${file.getAbsolutePath} to $entries entries; position is now ${mmap.position()} and last entry is now ${_lastEntry}") } @@ -211,11 +210,11 @@ class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable: override def sanityCheck(): Unit = { val lastTimestamp = lastEntry.timestamp val lastOffset = lastEntry.offset - if (_entries != 0 && lastTimestamp < timestamp(mmap, 0)) + if (entries != 0 && lastTimestamp < timestamp(mmap, 0)) throw new CorruptIndexException(s"Corrupt time index found, time index file (${file.getAbsolutePath}) has " + s"non-zero size but the last timestamp is $lastTimestamp which is less than the first timestamp " + s"${timestamp(mmap, 0)}") - if (_entries != 0 && lastOffset < baseOffset) + if (entries != 0 && lastOffset < baseOffset) throw new CorruptIndexException(s"Corrupt time index found, time index file (${file.getAbsolutePath}) has " + s"non-zero size but the last offset is $lastOffset which is less than the first offset $baseOffset") if (length % entrySize != 0) diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml index 47932a1313c5d..93702b7b9dbdd 100644 --- a/gradle/spotbugs-exclude.xml +++ b/gradle/spotbugs-exclude.xml @@ -322,6 +322,13 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read + + + + + + + diff --git a/storage/src/main/java/org/apache/kafka/server/log/internals/AbstractIndex.java b/storage/src/main/java/org/apache/kafka/server/log/internals/AbstractIndex.java new file mode 100644 index 0000000000000..77e86a33fd90b --- /dev/null +++ b/storage/src/main/java/org/apache/kafka/server/log/internals/AbstractIndex.java @@ -0,0 +1,553 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import org.apache.kafka.common.utils.ByteBufferUnmapper; +import org.apache.kafka.common.utils.OperatingSystem; +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.util.Objects; +import java.util.OptionalInt; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * The abstract index class which holds entry format agnostic methods. + */ +public abstract class AbstractIndex implements Closeable { + + private static class BinarySearchResult { + public final int largestLowerBound; + public final int smallestUpperBound; + + private BinarySearchResult(int largestLowerBound, int smallestUpperBound) { + this.largestLowerBound = largestLowerBound; + this.smallestUpperBound = smallestUpperBound; + } + } + + private static final Logger log = LoggerFactory.getLogger(AbstractIndex.class); + + protected final ReentrantLock lock = new ReentrantLock(); + + private final long baseOffset; + private final int maxIndexSize; + private final boolean writable; + + private volatile File file; + + // Length of the index file + private volatile long length; + + private volatile MappedByteBuffer mmap; + + /** + * The maximum number of entries this index can hold + */ + private volatile int maxEntries; + /** The number of entries in this index */ + private volatile int entries; + + + /** + * @param file The index file + * @param baseOffset the base offset of the segment that this index is corresponding to. + * @param maxIndexSize The maximum index size in bytes. + */ + public AbstractIndex(File file, long baseOffset, int maxIndexSize, boolean writable) throws IOException { + Objects.requireNonNull(file); + this.file = file; + this.baseOffset = baseOffset; + this.maxIndexSize = maxIndexSize; + this.writable = writable; + + createAndAssignMmap(); + this.maxEntries = mmap.limit() / entrySize(); + this.entries = mmap.position() / entrySize(); + } + + private void createAndAssignMmap() throws IOException { + boolean newlyCreated = file.createNewFile(); + RandomAccessFile raf; + if (writable) + raf = new RandomAccessFile(file, "rw"); + else + raf = new RandomAccessFile(file, "r"); + + try { + /* pre-allocate the file if necessary */ + if (newlyCreated) { + if (maxIndexSize < entrySize()) + throw new IllegalArgumentException("Invalid max index size: " + maxIndexSize); + raf.setLength(roundDownToExactMultiple(maxIndexSize, entrySize())); + } + + long length = raf.length(); + MappedByteBuffer mmap = createMappedBuffer(raf, newlyCreated, length, writable, entrySize()); + + this.length = length; + this.mmap = mmap; + } finally { + Utils.closeQuietly(raf, "index " + file.getName()); + } + } + + /** + * Do a basic sanity check on this index to detect obvious problems + * + * @throws CorruptIndexException if any problems are found + */ + public abstract void sanityCheck(); + + /** + * Remove all entries from the index which have an offset greater than or equal to the given offset. + * Truncating to an offset larger than the largest in the index has no effect. + */ + public abstract void truncateTo(long offset); + + /** + * Remove all the entries from the index. + */ + protected abstract void truncate(); + + protected abstract int entrySize(); + + + /** + * To parse an entry in the index. + * + * @param buffer the buffer of this memory mapped index. + * @param n the slot + * @return the index entry stored in the given slot. + */ + protected abstract IndexEntry parseEntry(ByteBuffer buffer, int n); + + /** + * True iff there are no more slots available in this index + */ + public boolean isFull() { + return entries >= maxEntries; + } + + public File file() { + return this.file; + } + + public int maxEntries() { + return this.maxEntries; + } + + public int entries() { + return this.entries; + } + + public long length() { + return this.length; + } + + public int maxIndexSize() { + return maxIndexSize; + } + + public long baseOffset() { + return baseOffset; + } + + public void updateParentDir(File parentDir) { + this.file = new File(parentDir, file.getName()); + } + + /** + * Reset the size of the memory map and the underneath file. This is used in two kinds of cases: (1) in + * trimToValidSize() which is called at closing the segment or new segment being rolled; (2) at + * loading segments from disk or truncating back to an old segment where a new log segment became active; + * we want to reset the index size to maximum index size to avoid rolling new segment. + * + * @param newSize new size of the index file + * @return a boolean indicating whether the size of the memory map and the underneath file is changed or not. + */ + public boolean resize(int newSize) throws IOException { + lock.lock(); + try { + int roundedNewSize = roundDownToExactMultiple(newSize, entrySize()); + + if (length == roundedNewSize) { + log.debug("Index {} was not resized because it already has size {}", file.getAbsolutePath(), roundedNewSize); + return false; + } else { + RandomAccessFile raf = new RandomAccessFile(file, "rw"); + try { + int position = mmap.position(); + + /* Windows or z/OS won't let us modify the file length while the file is mmapped :-( */ + if (OperatingSystem.IS_WINDOWS || OperatingSystem.IS_ZOS) + safeForceUnmap(); + raf.setLength(roundedNewSize); + this.length = roundedNewSize; + mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize); + this.maxEntries = mmap.limit() / entrySize(); + mmap.position(position); + log.debug("Resized {} to {}, position is {} and limit is {}", file.getAbsolutePath(), roundedNewSize, + mmap.position(), mmap.limit()); + return true; + } finally { + Utils.closeQuietly(raf, "index file " + file.getName()); + } + } + } finally { + lock.unlock(); + } + } + + /** + * Rename the file that backs this offset index + * + * @throws IOException if rename fails + */ + public void renameTo(File f) throws IOException { + try { + Utils.atomicMoveWithFallback(file.toPath(), f.toPath(), false); + } finally { + this.file = f; + } + } + + /** + * Flush the data in the index to disk + */ + public void flush() { + lock.lock(); + try { + mmap.force(); + } finally { + lock.unlock(); + } + } + + /** + * Delete this index file. + * + * @throws IOException if deletion fails due to an I/O error + * @return `true` if the file was deleted by this method; `false` if the file could not be deleted because it did + * not exist + */ + public boolean deleteIfExists() throws IOException { + closeHandler(); + return Files.deleteIfExists(file.toPath()); + } + + /** + * Trim this segment to fit just the valid entries, deleting all trailing unwritten bytes from + * the file. + */ + public void trimToValidSize() throws IOException { + lock.lock(); + try { + resize(entrySize() * entries); + } finally { + lock.unlock(); + } + } + + /** + * The number of bytes actually used by this index + */ + public int sizeInBytes() { + return entrySize() * entries; + } + + public void close() throws IOException { + trimToValidSize(); + closeHandler(); + } + + public void closeHandler() { + // On JVM, a memory mapping is typically unmapped by garbage collector. + // However, in some cases it can pause application threads(STW) for a long moment reading metadata from a physical disk. + // To prevent this, we forcefully cleanup memory mapping within proper execution which never affects API responsiveness. + // See https://issues.apache.org/jira/browse/KAFKA-4614 for the details. + lock.lock(); + try { + safeForceUnmap(); + } finally { + lock.unlock(); + } + } + + /** + * Remove all the entries from the index and resize the index to the max index size. + */ + public void reset() throws IOException { + truncate(); + resize(maxIndexSize); + } + + /** + * Get offset relative to base offset of this index + * @throws IndexOffsetOverflowException + */ + public int relativeOffset(long offset) { + OptionalInt relativeOffset = toRelative(offset); + return relativeOffset.orElseThrow(() -> new IndexOffsetOverflowException( + "Integer overflow for offset: " + offset + " (" + file.getAbsoluteFile() + ")")); + } + + /** + * Check if a particular offset is valid to be appended to this index. + * @param offset The offset to check + * @return true if this offset is valid to be appended to this index; false otherwise + */ + public boolean canAppendOffset(long offset) { + return toRelative(offset).isPresent(); + } + + protected final MappedByteBuffer mmap() { + return mmap; + } + + /* + * Kafka mmaps index files into memory, and all the read / write operations of the index is through OS page cache. This + * avoids blocked disk I/O in most cases. + * + * To the extent of our knowledge, all the modern operating systems use LRU policy or its variants to manage page + * cache. Kafka always appends to the end of the index file, and almost all the index lookups (typically from in-sync + * followers or consumers) are very close to the end of the index. So, the LRU cache replacement policy should work very + * well with Kafka's index access pattern. + * + * However, when looking up index, the standard binary search algorithm is not cache friendly, and can cause unnecessary + * page faults (the thread is blocked to wait for reading some index entries from hard disk, as those entries are not + * cached in the page cache). + * + * For example, in an index with 13 pages, to lookup an entry in the last page (page #12), the standard binary search + * algorithm will read index entries in page #0, 6, 9, 11, and 12. + * page number: |0|1|2|3|4|5|6|7|8|9|10|11|12 | + * steps: |1| | | | | |3| | |4| |5 |2/6| + * In each page, there are hundreds log entries, corresponding to hundreds to thousands of kafka messages. When the + * index gradually growing from the 1st entry in page #12 to the last entry in page #12, all the write (append) + * operations are in page #12, and all the in-sync follower / consumer lookups read page #0,6,9,11,12. As these pages + * are always used in each in-sync lookup, we can assume these pages are fairly recently used, and are very likely to be + * in the page cache. When the index grows to page #13, the pages needed in a in-sync lookup change to #0, 7, 10, 12, + * and 13: + * page number: |0|1|2|3|4|5|6|7|8|9|10|11|12|13 | + * steps: |1| | | | | | |3| | | 4|5 | 6|2/7| + * Page #7 and page #10 have not been used for a very long time. They are much less likely to be in the page cache, than + * the other pages. The 1st lookup, after the 1st index entry in page #13 is appended, is likely to have to read page #7 + * and page #10 from disk (page fault), which can take up to more than a second. In our test, this can cause the + * at-least-once produce latency to jump to about 1 second from a few ms. + * + * Here, we use a more cache-friendly lookup algorithm: + * if (target > indexEntry[end - N]) // if the target is in the last N entries of the index + * binarySearch(end - N, end) + * else + * binarySearch(begin, end - N) + * + * If possible, we only look up in the last N entries of the index. By choosing a proper constant N, all the in-sync + * lookups should go to the 1st branch. We call the last N entries the "warm" section. As we frequently look up in this + * relatively small section, the pages containing this section are more likely to be in the page cache. + * + * We set N (_warmEntries) to 8192, because + * 1. This number is small enough to guarantee all the pages of the "warm" section is touched in every warm-section + * lookup. So that, the entire warm section is really "warm". + * When doing warm-section lookup, following 3 entries are always touched: indexEntry(end), indexEntry(end-N), + * and indexEntry((end*2 -N)/2). If page size >= 4096, all the warm-section pages (3 or fewer) are touched, when we + * touch those 3 entries. As of 2018, 4096 is the smallest page size for all the processors (x86-32, x86-64, MIPS, + * SPARC, Power, ARM etc.). + * 2. This number is large enough to guarantee most of the in-sync lookups are in the warm-section. With default Kafka + * settings, 8KB index corresponds to about 4MB (offset index) or 2.7MB (time index) log messages. + * + * We can't set make N (_warmEntries) to be larger than 8192, as there is no simple way to guarantee all the "warm" + * section pages are really warm (touched in every lookup) on a typical 4KB-page host. + * + * In there future, we may use a backend thread to periodically touch the entire warm section. So that, we can + * 1) support larger warm section + * 2) make sure the warm section of low QPS topic-partitions are really warm. + */ + protected final int warmEntries() { + return 8192 / entrySize(); + } + + protected void safeForceUnmap() { + if (mmap != null) { + try { + forceUnmap(); + } catch (Throwable t) { + log.error("Error unmapping index {}", file, t); + } + } + } + + /** + * Forcefully free the buffer's mmap. + */ + // Visible for testing, we can make this protected once OffsetIndexTest is in the same package as this class + public void forceUnmap() throws IOException { + try { + ByteBufferUnmapper.unmap(file.getAbsolutePath(), mmap); + } finally { + mmap = null; + } + } + + // GuardedBy `lock` + protected void incrementEntries() { + ++entries; + } + + protected void truncateToEntries0(int entries) { + this.entries = entries; + mmap.position(entries * entrySize()); + } + + /** + * Execute the given function in a lock only if we are running on windows or z/OS. We do this + * because Windows or z/OS won't let us resize a file while it is mmapped. As a result we have to force unmap it + * and this requires synchronizing reads. + */ + protected final T maybeLock(Lock lock, StorageAction action) throws E { + if (OperatingSystem.IS_WINDOWS || OperatingSystem.IS_ZOS) + lock.lock(); + try { + return action.execute(); + } finally { + if (OperatingSystem.IS_WINDOWS || OperatingSystem.IS_ZOS) + lock.unlock(); + } + } + + /** + * Find the slot in which the largest entry less than or equal to the given target key or value is stored. + * The comparison is made using the `IndexEntry.compareTo()` method. + * + * @param idx The index buffer + * @param target The index key to look for + * @return The slot found or -1 if the least entry in the index is larger than the target key or the index is empty + */ + protected int largestLowerBoundSlotFor(ByteBuffer idx, long target, IndexSearchType searchEntity) { + return indexSlotRangeFor(idx, target, searchEntity).largestLowerBound; + } + + /** + * Find the smallest entry greater than or equal the target key or value. If none can be found, -1 is returned. + */ + protected int smallestUpperBoundSlotFor(ByteBuffer idx, long target, IndexSearchType searchEntity) { + return indexSlotRangeFor(idx, target, searchEntity).smallestUpperBound; + } + + /** + * Round a number to the greatest exact multiple of the given factor less than the given number. + * E.g. roundDownToExactMultiple(67, 8) == 64 + */ + private static int roundDownToExactMultiple(int number, int factor) { + return factor * (number / factor); + } + + private static MappedByteBuffer createMappedBuffer(RandomAccessFile raf, boolean newlyCreated, long length, + boolean writable, int entrySize) throws IOException { + MappedByteBuffer idx; + if (writable) + idx = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, length); + else + idx = raf.getChannel().map(FileChannel.MapMode.READ_ONLY, 0, length); + + /* set the position in the index for the next entry */ + if (newlyCreated) + idx.position(0); + else + // if this is a pre-existing index, assume it is valid and set position to last entry + idx.position(roundDownToExactMultiple(idx.limit(), entrySize)); + + return idx; + } + + /** + * Lookup lower and upper bounds for the given target. + */ + private BinarySearchResult indexSlotRangeFor(ByteBuffer idx, long target, IndexSearchType searchEntity) { + // check if the index is empty + if (entries == 0) + return new BinarySearchResult(-1, -1); + + int firstHotEntry = Math.max(0, entries - 1 - warmEntries()); + // check if the target offset is in the warm section of the index + if (compareIndexEntry(parseEntry(idx, firstHotEntry), target, searchEntity) < 0) { + return binarySearch(idx, target, searchEntity, firstHotEntry, entries - 1); + } + + // check if the target offset is smaller than the least offset + if (compareIndexEntry(parseEntry(idx, 0), target, searchEntity) > 0) + return new BinarySearchResult(-1, 0); + + return binarySearch(idx, target, searchEntity, 0, firstHotEntry); + } + + private BinarySearchResult binarySearch(ByteBuffer idx, long target, IndexSearchType searchEntity, int begin, int end) { + // binary search for the entry + int lo = begin; + int hi = end; + while (lo < hi) { + int mid = (lo + hi + 1) >>> 1; + IndexEntry found = parseEntry(idx, mid); + int compareResult = compareIndexEntry(found, target, searchEntity); + if (compareResult > 0) + hi = mid - 1; + else if (compareResult < 0) + lo = mid; + else + return new BinarySearchResult(mid, mid); + } + int blah; + if (lo == entries - 1) + blah = -1; + else + blah = lo + 1; + return new BinarySearchResult(lo, blah); + } + + private int compareIndexEntry(IndexEntry indexEntry, long target, IndexSearchType searchEntity) { + int result; + switch (searchEntity) { + case KEY: + result = Long.compare(indexEntry.indexKey(), target); + break; + case VALUE: + result = Long.compare(indexEntry.indexValue(), target); + break; + default: + throw new IllegalStateException("Unexpected IndexSearchType: " + searchEntity); + } + return result; + } + + private OptionalInt toRelative(long offset) { + long relativeOffset = offset - baseOffset; + if (relativeOffset < 0 || relativeOffset > Integer.MAX_VALUE) + return OptionalInt.empty(); + else + return OptionalInt.of((int) relativeOffset); + } + +} diff --git a/core/src/main/scala/kafka/common/IndexOffsetOverflowException.scala b/storage/src/main/java/org/apache/kafka/server/log/internals/IndexOffsetOverflowException.java similarity index 61% rename from core/src/main/scala/kafka/common/IndexOffsetOverflowException.scala rename to storage/src/main/java/org/apache/kafka/server/log/internals/IndexOffsetOverflowException.java index 5dd9b43e9e843..aa2c0a590f85b 100644 --- a/core/src/main/scala/kafka/common/IndexOffsetOverflowException.scala +++ b/storage/src/main/java/org/apache/kafka/server/log/internals/IndexOffsetOverflowException.java @@ -1,10 +1,10 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with + * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at + * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * @@ -14,12 +14,20 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.kafka.server.log.internals; -package kafka.common +import org.apache.kafka.common.KafkaException; /** * Indicates that an attempt was made to append a message whose offset could cause the index offset to overflow. */ -class IndexOffsetOverflowException(message: String, cause: Throwable) extends org.apache.kafka.common.KafkaException(message, cause) { - def this(message: String) = this(message, null) +public class IndexOffsetOverflowException extends KafkaException { + + public IndexOffsetOverflowException(String message) { + super(message); + } + + public IndexOffsetOverflowException(String message, Throwable cause) { + super(message, cause); + } } diff --git a/storage/src/main/java/org/apache/kafka/server/log/internals/IndexSearchType.java b/storage/src/main/java/org/apache/kafka/server/log/internals/IndexSearchType.java new file mode 100644 index 0000000000000..2a6536466de49 --- /dev/null +++ b/storage/src/main/java/org/apache/kafka/server/log/internals/IndexSearchType.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +public enum IndexSearchType { + KEY, VALUE; +} From c4ec833c375fdbd7f10b8b6ccb1f794601bf9bc0 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Mon, 19 Dec 2022 14:51:09 -0800 Subject: [PATCH 2/2] Address review comments storage/src/main/java/org/apache/kafka/server/log/internals/TimeIndex.java --- .../apache/kafka/server/log/internals/AbstractIndex.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/storage/src/main/java/org/apache/kafka/server/log/internals/AbstractIndex.java b/storage/src/main/java/org/apache/kafka/server/log/internals/AbstractIndex.java index 77e86a33fd90b..36e7e50be079a 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/internals/AbstractIndex.java +++ b/storage/src/main/java/org/apache/kafka/server/log/internals/AbstractIndex.java @@ -412,7 +412,7 @@ public void forceUnmap() throws IOException { } } - // GuardedBy `lock` + // The caller is expected to hold `lock` when calling this method protected void incrementEntries() { ++entries; } @@ -519,12 +519,11 @@ else if (compareResult < 0) else return new BinarySearchResult(mid, mid); } - int blah; if (lo == entries - 1) - blah = -1; + hi = -1; else - blah = lo + 1; - return new BinarySearchResult(lo, blah); + hi = lo + 1; + return new BinarySearchResult(lo, hi); } private int compareIndexEntry(IndexEntry indexEntry, long target, IndexSearchType searchEntity) {