diff --git a/core/src/main/scala/kafka/log/AbstractIndex.scala b/core/src/main/scala/kafka/log/AbstractIndex.scala
deleted file mode 100644
index 1c8032115d890..0000000000000
--- a/core/src/main/scala/kafka/log/AbstractIndex.scala
+++ /dev/null
@@ -1,440 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.log
-
-import java.io.{Closeable, File, RandomAccessFile}
-import java.nio.channels.FileChannel
-import java.nio.file.Files
-import java.nio.{ByteBuffer, MappedByteBuffer}
-import java.util.concurrent.locks.{Lock, ReentrantLock}
-import kafka.common.IndexOffsetOverflowException
-import kafka.utils.CoreUtils.inLock
-import kafka.utils.{CoreUtils, Logging}
-import org.apache.kafka.common.utils.{ByteBufferUnmapper, OperatingSystem, Utils}
-import org.apache.kafka.server.log.internals.IndexEntry
-
-/**
- * The abstract index class which holds entry format agnostic methods.
- *
- * @param _file The index file
- * @param baseOffset the base offset of the segment that this index is corresponding to.
- * @param maxIndexSize The maximum index size in bytes.
- */
-abstract class AbstractIndex(@volatile private var _file: File, val baseOffset: Long, val maxIndexSize: Int = -1,
- val writable: Boolean) extends Closeable {
- import AbstractIndex._
-
- // Length of the index file
- @volatile
- private var _length: Long = _
- protected def entrySize: Int
-
- /*
- Kafka mmaps index files into memory, and all the read / write operations of the index is through OS page cache. This
- avoids blocked disk I/O in most cases.
-
- To the extent of our knowledge, all the modern operating systems use LRU policy or its variants to manage page
- cache. Kafka always appends to the end of the index file, and almost all the index lookups (typically from in-sync
- followers or consumers) are very close to the end of the index. So, the LRU cache replacement policy should work very
- well with Kafka's index access pattern.
-
- However, when looking up index, the standard binary search algorithm is not cache friendly, and can cause unnecessary
- page faults (the thread is blocked to wait for reading some index entries from hard disk, as those entries are not
- cached in the page cache).
-
- For example, in an index with 13 pages, to lookup an entry in the last page (page #12), the standard binary search
- algorithm will read index entries in page #0, 6, 9, 11, and 12.
- page number: |0|1|2|3|4|5|6|7|8|9|10|11|12 |
- steps: |1| | | | | |3| | |4| |5 |2/6|
- In each page, there are hundreds log entries, corresponding to hundreds to thousands of kafka messages. When the
- index gradually growing from the 1st entry in page #12 to the last entry in page #12, all the write (append)
- operations are in page #12, and all the in-sync follower / consumer lookups read page #0,6,9,11,12. As these pages
- are always used in each in-sync lookup, we can assume these pages are fairly recently used, and are very likely to be
- in the page cache. When the index grows to page #13, the pages needed in a in-sync lookup change to #0, 7, 10, 12,
- and 13:
- page number: |0|1|2|3|4|5|6|7|8|9|10|11|12|13 |
- steps: |1| | | | | | |3| | | 4|5 | 6|2/7|
- Page #7 and page #10 have not been used for a very long time. They are much less likely to be in the page cache, than
- the other pages. The 1st lookup, after the 1st index entry in page #13 is appended, is likely to have to read page #7
- and page #10 from disk (page fault), which can take up to more than a second. In our test, this can cause the
- at-least-once produce latency to jump to about 1 second from a few ms.
-
- Here, we use a more cache-friendly lookup algorithm:
- if (target > indexEntry[end - N]) // if the target is in the last N entries of the index
- binarySearch(end - N, end)
- else
- binarySearch(begin, end - N)
-
- If possible, we only look up in the last N entries of the index. By choosing a proper constant N, all the in-sync
- lookups should go to the 1st branch. We call the last N entries the "warm" section. As we frequently look up in this
- relatively small section, the pages containing this section are more likely to be in the page cache.
-
- We set N (_warmEntries) to 8192, because
- 1. This number is small enough to guarantee all the pages of the "warm" section is touched in every warm-section
- lookup. So that, the entire warm section is really "warm".
- When doing warm-section lookup, following 3 entries are always touched: indexEntry(end), indexEntry(end-N),
- and indexEntry((end*2 -N)/2). If page size >= 4096, all the warm-section pages (3 or fewer) are touched, when we
- touch those 3 entries. As of 2018, 4096 is the smallest page size for all the processors (x86-32, x86-64, MIPS,
- SPARC, Power, ARM etc.).
- 2. This number is large enough to guarantee most of the in-sync lookups are in the warm-section. With default Kafka
- settings, 8KB index corresponds to about 4MB (offset index) or 2.7MB (time index) log messages.
-
- We can't set make N (_warmEntries) to be larger than 8192, as there is no simple way to guarantee all the "warm"
- section pages are really warm (touched in every lookup) on a typical 4KB-page host.
-
- In there future, we may use a backend thread to periodically touch the entire warm section. So that, we can
- 1) support larger warm section
- 2) make sure the warm section of low QPS topic-partitions are really warm.
- */
- protected def _warmEntries: Int = 8192 / entrySize
-
- protected val lock = new ReentrantLock
-
- @volatile
- protected var mmap: MappedByteBuffer = {
- val newlyCreated = file.createNewFile()
- val raf = if (writable) new RandomAccessFile(file, "rw") else new RandomAccessFile(file, "r")
- try {
- /* pre-allocate the file if necessary */
- if(newlyCreated) {
- if(maxIndexSize < entrySize)
- throw new IllegalArgumentException("Invalid max index size: " + maxIndexSize)
- raf.setLength(roundDownToExactMultiple(maxIndexSize, entrySize))
- }
-
- /* memory-map the file */
- _length = raf.length()
- val idx = {
- if (writable)
- raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, _length)
- else
- raf.getChannel.map(FileChannel.MapMode.READ_ONLY, 0, _length)
- }
- /* set the position in the index for the next entry */
- if(newlyCreated)
- idx.position(0)
- else
- // if this is a pre-existing index, assume it is valid and set position to last entry
- idx.position(roundDownToExactMultiple(idx.limit(), entrySize))
- idx
- } finally {
- CoreUtils.swallow(raf.close(), AbstractIndex)
- }
- }
-
- /**
- * The maximum number of entries this index can hold
- */
- @volatile
- private[this] var _maxEntries: Int = mmap.limit() / entrySize
-
- /** The number of entries in this index */
- @volatile
- protected var _entries: Int = mmap.position() / entrySize
-
- /**
- * True iff there are no more slots available in this index
- */
- def isFull: Boolean = _entries >= _maxEntries
-
- def file: File = _file
-
- def maxEntries: Int = _maxEntries
-
- def entries: Int = _entries
-
- def length: Long = _length
-
- def updateParentDir(parentDir: File): Unit = _file = new File(parentDir, file.getName)
-
- /**
- * Reset the size of the memory map and the underneath file. This is used in two kinds of cases: (1) in
- * trimToValidSize() which is called at closing the segment or new segment being rolled; (2) at
- * loading segments from disk or truncating back to an old segment where a new log segment became active;
- * we want to reset the index size to maximum index size to avoid rolling new segment.
- *
- * @param newSize new size of the index file
- * @return a boolean indicating whether the size of the memory map and the underneath file is changed or not.
- */
- def resize(newSize: Int): Boolean = {
- inLock(lock) {
- val roundedNewSize = roundDownToExactMultiple(newSize, entrySize)
-
- if (_length == roundedNewSize) {
- debug(s"Index ${file.getAbsolutePath} was not resized because it already has size $roundedNewSize")
- false
- } else {
- val raf = new RandomAccessFile(file, "rw")
- try {
- val position = mmap.position()
-
- /* Windows or z/OS won't let us modify the file length while the file is mmapped :-( */
- if (OperatingSystem.IS_WINDOWS || OperatingSystem.IS_ZOS)
- safeForceUnmap()
- raf.setLength(roundedNewSize)
- _length = roundedNewSize
- mmap = raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize)
- _maxEntries = mmap.limit() / entrySize
- mmap.position(position)
- debug(s"Resized ${file.getAbsolutePath} to $roundedNewSize, position is ${mmap.position()} " +
- s"and limit is ${mmap.limit()}")
- true
- } finally {
- CoreUtils.swallow(raf.close(), AbstractIndex)
- }
- }
- }
- }
-
- /**
- * Rename the file that backs this offset index
- *
- * @throws IOException if rename fails
- */
- def renameTo(f: File): Unit = {
- try Utils.atomicMoveWithFallback(file.toPath, f.toPath, false)
- finally _file = f
- }
-
- /**
- * Flush the data in the index to disk
- */
- def flush(): Unit = {
- inLock(lock) {
- mmap.force()
- }
- }
-
- /**
- * Delete this index file.
- *
- * @throws IOException if deletion fails due to an I/O error
- * @return `true` if the file was deleted by this method; `false` if the file could not be deleted because it did
- * not exist
- */
- def deleteIfExists(): Boolean = {
- closeHandler()
- Files.deleteIfExists(file.toPath)
- }
-
- /**
- * Trim this segment to fit just the valid entries, deleting all trailing unwritten bytes from
- * the file.
- */
- def trimToValidSize(): Unit = {
- inLock(lock) {
- resize(entrySize * _entries)
- }
- }
-
- /**
- * The number of bytes actually used by this index
- */
- def sizeInBytes: Int = entrySize * _entries
-
- /** Close the index */
- def close(): Unit = {
- trimToValidSize()
- closeHandler()
- }
-
- def closeHandler(): Unit = {
- // On JVM, a memory mapping is typically unmapped by garbage collector.
- // However, in some cases it can pause application threads(STW) for a long moment reading metadata from a physical disk.
- // To prevent this, we forcefully cleanup memory mapping within proper execution which never affects API responsiveness.
- // See https://issues.apache.org/jira/browse/KAFKA-4614 for the details.
- inLock(lock) {
- safeForceUnmap()
- }
- }
-
- /**
- * Do a basic sanity check on this index to detect obvious problems
- *
- * @throws CorruptIndexException if any problems are found
- */
- def sanityCheck(): Unit
-
- /**
- * Remove all the entries from the index.
- */
- protected def truncate(): Unit
-
- /**
- * Remove all entries from the index which have an offset greater than or equal to the given offset.
- * Truncating to an offset larger than the largest in the index has no effect.
- */
- def truncateTo(offset: Long): Unit
-
- /**
- * Remove all the entries from the index and resize the index to the max index size.
- */
- def reset(): Unit = {
- truncate()
- resize(maxIndexSize)
- }
-
- /**
- * Get offset relative to base offset of this index
- * @throws IndexOffsetOverflowException
- */
- def relativeOffset(offset: Long): Int = {
- val relativeOffset = toRelative(offset)
- if (relativeOffset.isEmpty)
- throw new IndexOffsetOverflowException(s"Integer overflow for offset: $offset (${file.getAbsoluteFile})")
- relativeOffset.get
- }
-
- /**
- * Check if a particular offset is valid to be appended to this index.
- * @param offset The offset to check
- * @return true if this offset is valid to be appended to this index; false otherwise
- */
- def canAppendOffset(offset: Long): Boolean = {
- toRelative(offset).isDefined
- }
-
- protected def safeForceUnmap(): Unit = {
- if (mmap != null) {
- try forceUnmap()
- catch {
- case t: Throwable => error(s"Error unmapping index $file", t)
- }
- }
- }
-
- /**
- * Forcefully free the buffer's mmap.
- */
- protected[log] def forceUnmap(): Unit = {
- try ByteBufferUnmapper.unmap(file.getAbsolutePath, mmap)
- finally mmap = null // Accessing unmapped mmap crashes JVM by SEGV so we null it out to be safe
- }
-
- /**
- * Execute the given function in a lock only if we are running on windows or z/OS. We do this
- * because Windows or z/OS won't let us resize a file while it is mmapped. As a result we have to force unmap it
- * and this requires synchronizing reads.
- */
- protected def maybeLock[T](lock: Lock)(fun: => T): T = {
- if (OperatingSystem.IS_WINDOWS || OperatingSystem.IS_ZOS)
- lock.lock()
- try fun
- finally {
- if (OperatingSystem.IS_WINDOWS || OperatingSystem.IS_ZOS)
- lock.unlock()
- }
- }
-
- /**
- * To parse an entry in the index.
- *
- * @param buffer the buffer of this memory mapped index.
- * @param n the slot
- * @return the index entry stored in the given slot.
- */
- protected def parseEntry(buffer: ByteBuffer, n: Int): IndexEntry
-
- /**
- * Find the slot in which the largest entry less than or equal to the given target key or value is stored.
- * The comparison is made using the `IndexEntry.compareTo()` method.
- *
- * @param idx The index buffer
- * @param target The index key to look for
- * @return The slot found or -1 if the least entry in the index is larger than the target key or the index is empty
- */
- protected def largestLowerBoundSlotFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchType): Int =
- indexSlotRangeFor(idx, target, searchEntity)._1
-
- /**
- * Find the smallest entry greater than or equal the target key or value. If none can be found, -1 is returned.
- */
- protected def smallestUpperBoundSlotFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchType): Int =
- indexSlotRangeFor(idx, target, searchEntity)._2
-
- /**
- * Lookup lower and upper bounds for the given target.
- */
- private def indexSlotRangeFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchType): (Int, Int) = {
- // check if the index is empty
- if(_entries == 0)
- return (-1, -1)
-
- def binarySearch(begin: Int, end: Int) : (Int, Int) = {
- // binary search for the entry
- var lo = begin
- var hi = end
- while(lo < hi) {
- val mid = (lo + hi + 1) >>> 1
- val found = parseEntry(idx, mid)
- val compareResult = compareIndexEntry(found, target, searchEntity)
- if(compareResult > 0)
- hi = mid - 1
- else if(compareResult < 0)
- lo = mid
- else
- return (mid, mid)
- }
- (lo, if (lo == _entries - 1) -1 else lo + 1)
- }
-
- val firstHotEntry = Math.max(0, _entries - 1 - _warmEntries)
- // check if the target offset is in the warm section of the index
- if(compareIndexEntry(parseEntry(idx, firstHotEntry), target, searchEntity) < 0) {
- return binarySearch(firstHotEntry, _entries - 1)
- }
-
- // check if the target offset is smaller than the least offset
- if(compareIndexEntry(parseEntry(idx, 0), target, searchEntity) > 0)
- return (-1, 0)
-
- binarySearch(0, firstHotEntry)
- }
-
- private def compareIndexEntry(indexEntry: IndexEntry, target: Long, searchEntity: IndexSearchType): Int = {
- searchEntity match {
- case IndexSearchType.KEY => java.lang.Long.compare(indexEntry.indexKey, target)
- case IndexSearchType.VALUE => java.lang.Long.compare(indexEntry.indexValue, target)
- }
- }
-
- /**
- * Round a number to the greatest exact multiple of the given factor less than the given number.
- * E.g. roundDownToExactMultiple(67, 8) == 64
- */
- private def roundDownToExactMultiple(number: Int, factor: Int) = factor * (number / factor)
-
- private def toRelative(offset: Long): Option[Int] = {
- val relativeOffset = offset - baseOffset
- if (relativeOffset < 0 || relativeOffset > Int.MaxValue)
- None
- else
- Some(relativeOffset.toInt)
- }
-
-}
-
-object AbstractIndex extends Logging {
- override val loggerName: String = classOf[AbstractIndex].getName
-}
-
-sealed trait IndexSearchType
-object IndexSearchType {
- case object KEY extends IndexSearchType
- case object VALUE extends IndexSearchType
-}
diff --git a/core/src/main/scala/kafka/log/LazyIndex.scala b/core/src/main/scala/kafka/log/LazyIndex.scala
index 0aadb2a4c3819..d267883541203 100644
--- a/core/src/main/scala/kafka/log/LazyIndex.scala
+++ b/core/src/main/scala/kafka/log/LazyIndex.scala
@@ -20,11 +20,11 @@ package kafka.log
import java.io.File
import java.nio.file.{Files, NoSuchFileException}
import java.util.concurrent.locks.ReentrantLock
-
import LazyIndex._
import kafka.utils.CoreUtils.inLock
import kafka.utils.threadsafe
import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.server.log.internals.AbstractIndex
/**
* A wrapper over an `AbstractIndex` instance that provides a mechanism to defer loading
diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala
index 3b3ea461fbddf..9039f1d4e541d 100755
--- a/core/src/main/scala/kafka/log/OffsetIndex.scala
+++ b/core/src/main/scala/kafka/log/OffsetIndex.scala
@@ -22,7 +22,7 @@ import java.nio.ByteBuffer
import kafka.utils.CoreUtils.inLock
import kafka.utils.Logging
import org.apache.kafka.common.errors.InvalidOffsetException
-import org.apache.kafka.server.log.internals.{CorruptIndexException, OffsetPosition}
+import org.apache.kafka.server.log.internals.{AbstractIndex, CorruptIndexException, IndexSearchType, OffsetPosition}
/**
* An index that maps offsets to physical file locations for a particular log segment. This index may be sparse:
@@ -60,14 +60,14 @@ class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writabl
private[this] var _lastOffset = lastEntry.offset
debug(s"Loaded index file ${file.getAbsolutePath} with maxEntries = $maxEntries, " +
- s"maxIndexSize = $maxIndexSize, entries = ${_entries}, lastOffset = ${_lastOffset}, file position = ${mmap.position()}")
+ s"maxIndexSize = $maxIndexSize, entries = $entries, lastOffset = ${_lastOffset}, file position = ${mmap.position()}")
/**
* The last entry in the index
*/
private def lastEntry: OffsetPosition = {
inLock(lock) {
- _entries match {
+ entries match {
case 0 => new OffsetPosition(baseOffset, 0)
case s => parseEntry(mmap, s - 1)
}
@@ -86,14 +86,14 @@ class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writabl
* the pair (baseOffset, 0) is returned.
*/
def lookup(targetOffset: Long): OffsetPosition = {
- maybeLock(lock) {
+ maybeLock(lock, {() =>
val idx = mmap.duplicate
val slot = largestLowerBoundSlotFor(idx, targetOffset, IndexSearchType.KEY)
if(slot == -1)
new OffsetPosition(baseOffset, 0)
else
parseEntry(idx, slot)
- }
+ })
}
/**
@@ -102,14 +102,14 @@ class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writabl
* such offset.
*/
def fetchUpperBoundOffset(fetchOffset: OffsetPosition, fetchSize: Int): Option[OffsetPosition] = {
- maybeLock(lock) {
+ maybeLock(lock, { () =>
val idx = mmap.duplicate
val slot = smallestUpperBoundSlotFor(idx, fetchOffset.position + fetchSize, IndexSearchType.VALUE)
if (slot == -1)
None
else
Some(parseEntry(idx, slot))
- }
+ })
}
private def relativeOffset(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * entrySize)
@@ -126,12 +126,12 @@ class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writabl
* @return The offset/position pair at that entry
*/
def entry(n: Int): OffsetPosition = {
- maybeLock(lock) {
- if (n >= _entries)
+ maybeLock(lock, { () =>
+ if (n >= entries)
throw new IllegalArgumentException(s"Attempt to fetch the ${n}th entry from index ${file.getAbsolutePath}, " +
- s"which has size ${_entries}.")
+ s"which has size $entries.")
parseEntry(mmap, n)
- }
+ })
}
/**
@@ -141,14 +141,14 @@ class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writabl
*/
def append(offset: Long, position: Int): Unit = {
inLock(lock) {
- require(!isFull, "Attempt to append to a full index (size = " + _entries + ").")
- if (_entries == 0 || offset > _lastOffset) {
+ require(!isFull, "Attempt to append to a full index (size = " + entries + ").")
+ if (entries == 0 || offset > _lastOffset) {
trace(s"Adding index entry $offset => $position to ${file.getAbsolutePath}")
mmap.putInt(relativeOffset(offset))
mmap.putInt(position)
- _entries += 1
+ incrementEntries()
_lastOffset = offset
- require(_entries * entrySize == mmap.position(), s"$entries entries but file position in index is ${mmap.position()}.")
+ require(entries * entrySize == mmap.position(), s"$entries entries but file position in index is ${mmap.position()}.")
} else {
throw new InvalidOffsetException(s"Attempt to append an offset ($offset) to position $entries no larger than" +
s" the last offset appended (${_lastOffset}) to ${file.getAbsolutePath}.")
@@ -184,8 +184,7 @@ class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writabl
*/
private def truncateToEntries(entries: Int): Unit = {
inLock(lock) {
- _entries = entries
- mmap.position(_entries * entrySize)
+ super.truncateToEntries0(entries)
_lastOffset = lastEntry.offset
debug(s"Truncated index ${file.getAbsolutePath} to $entries entries;" +
s" position is now ${mmap.position()} and last offset is now ${_lastOffset}")
@@ -193,7 +192,7 @@ class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writabl
}
override def sanityCheck(): Unit = {
- if (_entries != 0 && _lastOffset < baseOffset)
+ if (entries != 0 && _lastOffset < baseOffset)
throw new CorruptIndexException(s"Corrupt index found, index file (${file.getAbsolutePath}) has non-zero size " +
s"but the last offset is ${_lastOffset} which is less than the base offset $baseOffset.")
if (length % entrySize != 0)
diff --git a/core/src/main/scala/kafka/log/TimeIndex.scala b/core/src/main/scala/kafka/log/TimeIndex.scala
index 6b22e2f3eafa8..f5ffc7f6c5291 100644
--- a/core/src/main/scala/kafka/log/TimeIndex.scala
+++ b/core/src/main/scala/kafka/log/TimeIndex.scala
@@ -23,7 +23,7 @@ import kafka.utils.CoreUtils.inLock
import kafka.utils.Logging
import org.apache.kafka.common.errors.InvalidOffsetException
import org.apache.kafka.common.record.RecordBatch
-import org.apache.kafka.server.log.internals.{CorruptIndexException, TimestampOffset}
+import org.apache.kafka.server.log.internals.{AbstractIndex, CorruptIndexException, IndexSearchType, TimestampOffset}
/**
* An index that maps from the timestamp to the logical offsets of the messages in a segment. This index might be
@@ -59,7 +59,7 @@ class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable:
override def entrySize = 12
debug(s"Loaded index file ${file.getAbsolutePath} with maxEntries = $maxEntries, maxIndexSize = $maxIndexSize," +
- s" entries = ${_entries}, lastOffset = ${_lastEntry}, file position = ${mmap.position()}")
+ s" entries = $entries, lastOffset = ${_lastEntry}, file position = ${mmap.position()}")
// We override the full check to reserve the last time index entry slot for the on roll call.
override def isFull: Boolean = entries >= maxEntries - 1
@@ -75,7 +75,7 @@ class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable:
*/
private def lastEntryFromIndexFile: TimestampOffset = {
inLock(lock) {
- _entries match {
+ entries match {
case 0 => new TimestampOffset(RecordBatch.NO_TIMESTAMP, baseOffset)
case s => parseEntry(mmap, s - 1)
}
@@ -88,12 +88,12 @@ class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable:
* @return The timestamp/offset pair at that entry
*/
def entry(n: Int): TimestampOffset = {
- maybeLock(lock) {
- if(n >= _entries)
+ maybeLock(lock, { () =>
+ if(n >= entries)
throw new IllegalArgumentException(s"Attempt to fetch the ${n}th entry from time index ${file.getAbsolutePath} " +
- s"which has size ${_entries}.")
+ s"which has size $entries.")
parseEntry(mmap, n)
- }
+ })
}
override def parseEntry(buffer: ByteBuffer, n: Int): TimestampOffset = {
@@ -113,18 +113,18 @@ class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable:
def maybeAppend(timestamp: Long, offset: Long, skipFullCheck: Boolean = false): Unit = {
inLock(lock) {
if (!skipFullCheck)
- require(!isFull, "Attempt to append to a full time index (size = " + _entries + ").")
+ require(!isFull, "Attempt to append to a full time index (size = " + entries + ").")
// We do not throw exception when the offset equals to the offset of last entry. That means we are trying
// to insert the same time index entry as the last entry.
// If the timestamp index entry to be inserted is the same as the last entry, we simply ignore the insertion
// because that could happen in the following two scenarios:
// 1. A log segment is closed.
// 2. LogSegment.onBecomeInactiveSegment() is called when an active log segment is rolled.
- if (_entries != 0 && offset < lastEntry.offset)
- throw new InvalidOffsetException(s"Attempt to append an offset ($offset) to slot ${_entries} no larger than" +
+ if (entries != 0 && offset < lastEntry.offset)
+ throw new InvalidOffsetException(s"Attempt to append an offset ($offset) to slot $entries no larger than" +
s" the last offset appended (${lastEntry.offset}) to ${file.getAbsolutePath}.")
- if (_entries != 0 && timestamp < lastEntry.timestamp)
- throw new IllegalStateException(s"Attempt to append a timestamp ($timestamp) to slot ${_entries} no larger" +
+ if (entries != 0 && timestamp < lastEntry.timestamp)
+ throw new IllegalStateException(s"Attempt to append a timestamp ($timestamp) to slot $entries no larger" +
s" than the last timestamp appended (${lastEntry.timestamp}) to ${file.getAbsolutePath}.")
// We only append to the time index when the timestamp is greater than the last inserted timestamp.
// If all the messages are in message format v0, the timestamp will always be NoTimestamp. In that case, the time
@@ -133,9 +133,9 @@ class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable:
trace(s"Adding index entry $timestamp => $offset to ${file.getAbsolutePath}.")
mmap.putLong(timestamp)
mmap.putInt(relativeOffset(offset))
- _entries += 1
+ incrementEntries()
_lastEntry = new TimestampOffset(timestamp, offset)
- require(_entries * entrySize == mmap.position(), s"${_entries} entries but file position in index is ${mmap.position()}.")
+ require(entries * entrySize == mmap.position(), s"$entries entries but file position in index is ${mmap.position()}.")
}
}
}
@@ -149,14 +149,14 @@ class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable:
* @return The time index entry found.
*/
def lookup(targetTimestamp: Long): TimestampOffset = {
- maybeLock(lock) {
+ maybeLock(lock, {() =>
val idx = mmap.duplicate
val slot = largestLowerBoundSlotFor(idx, targetTimestamp, IndexSearchType.KEY)
if (slot == -1)
new TimestampOffset(RecordBatch.NO_TIMESTAMP, baseOffset)
else
parseEntry(idx, slot)
- }
+ })
}
override def truncate(): Unit = truncateToEntries(0)
@@ -201,8 +201,7 @@ class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable:
*/
private def truncateToEntries(entries: Int): Unit = {
inLock(lock) {
- _entries = entries
- mmap.position(_entries * entrySize)
+ super.truncateToEntries0(entries)
_lastEntry = lastEntryFromIndexFile
debug(s"Truncated index ${file.getAbsolutePath} to $entries entries; position is now ${mmap.position()} and last entry is now ${_lastEntry}")
}
@@ -211,11 +210,11 @@ class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable:
override def sanityCheck(): Unit = {
val lastTimestamp = lastEntry.timestamp
val lastOffset = lastEntry.offset
- if (_entries != 0 && lastTimestamp < timestamp(mmap, 0))
+ if (entries != 0 && lastTimestamp < timestamp(mmap, 0))
throw new CorruptIndexException(s"Corrupt time index found, time index file (${file.getAbsolutePath}) has " +
s"non-zero size but the last timestamp is $lastTimestamp which is less than the first timestamp " +
s"${timestamp(mmap, 0)}")
- if (_entries != 0 && lastOffset < baseOffset)
+ if (entries != 0 && lastOffset < baseOffset)
throw new CorruptIndexException(s"Corrupt time index found, time index file (${file.getAbsolutePath}) has " +
s"non-zero size but the last offset is $lastOffset which is less than the first offset $baseOffset")
if (length % entrySize != 0)
diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml
index 47932a1313c5d..93702b7b9dbdd 100644
--- a/gradle/spotbugs-exclude.xml
+++ b/gradle/spotbugs-exclude.xml
@@ -322,6 +322,13 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read
+
+
+
+
+
+
+
diff --git a/storage/src/main/java/org/apache/kafka/server/log/internals/AbstractIndex.java b/storage/src/main/java/org/apache/kafka/server/log/internals/AbstractIndex.java
new file mode 100644
index 0000000000000..36e7e50be079a
--- /dev/null
+++ b/storage/src/main/java/org/apache/kafka/server/log/internals/AbstractIndex.java
@@ -0,0 +1,552 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import org.apache.kafka.common.utils.ByteBufferUnmapper;
+import org.apache.kafka.common.utils.OperatingSystem;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.util.Objects;
+import java.util.OptionalInt;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * The abstract index class which holds entry format agnostic methods.
+ */
+public abstract class AbstractIndex implements Closeable {
+
+ private static class BinarySearchResult {
+ public final int largestLowerBound;
+ public final int smallestUpperBound;
+
+ private BinarySearchResult(int largestLowerBound, int smallestUpperBound) {
+ this.largestLowerBound = largestLowerBound;
+ this.smallestUpperBound = smallestUpperBound;
+ }
+ }
+
+ private static final Logger log = LoggerFactory.getLogger(AbstractIndex.class);
+
+ protected final ReentrantLock lock = new ReentrantLock();
+
+ private final long baseOffset;
+ private final int maxIndexSize;
+ private final boolean writable;
+
+ private volatile File file;
+
+ // Length of the index file
+ private volatile long length;
+
+ private volatile MappedByteBuffer mmap;
+
+ /**
+ * The maximum number of entries this index can hold
+ */
+ private volatile int maxEntries;
+ /** The number of entries in this index */
+ private volatile int entries;
+
+
+ /**
+ * @param file The index file
+ * @param baseOffset the base offset of the segment that this index is corresponding to.
+ * @param maxIndexSize The maximum index size in bytes.
+ */
+ public AbstractIndex(File file, long baseOffset, int maxIndexSize, boolean writable) throws IOException {
+ Objects.requireNonNull(file);
+ this.file = file;
+ this.baseOffset = baseOffset;
+ this.maxIndexSize = maxIndexSize;
+ this.writable = writable;
+
+ createAndAssignMmap();
+ this.maxEntries = mmap.limit() / entrySize();
+ this.entries = mmap.position() / entrySize();
+ }
+
+ private void createAndAssignMmap() throws IOException {
+ boolean newlyCreated = file.createNewFile();
+ RandomAccessFile raf;
+ if (writable)
+ raf = new RandomAccessFile(file, "rw");
+ else
+ raf = new RandomAccessFile(file, "r");
+
+ try {
+ /* pre-allocate the file if necessary */
+ if (newlyCreated) {
+ if (maxIndexSize < entrySize())
+ throw new IllegalArgumentException("Invalid max index size: " + maxIndexSize);
+ raf.setLength(roundDownToExactMultiple(maxIndexSize, entrySize()));
+ }
+
+ long length = raf.length();
+ MappedByteBuffer mmap = createMappedBuffer(raf, newlyCreated, length, writable, entrySize());
+
+ this.length = length;
+ this.mmap = mmap;
+ } finally {
+ Utils.closeQuietly(raf, "index " + file.getName());
+ }
+ }
+
+ /**
+ * Do a basic sanity check on this index to detect obvious problems
+ *
+ * @throws CorruptIndexException if any problems are found
+ */
+ public abstract void sanityCheck();
+
+ /**
+ * Remove all entries from the index which have an offset greater than or equal to the given offset.
+ * Truncating to an offset larger than the largest in the index has no effect.
+ */
+ public abstract void truncateTo(long offset);
+
+ /**
+ * Remove all the entries from the index.
+ */
+ protected abstract void truncate();
+
+ protected abstract int entrySize();
+
+
+ /**
+ * To parse an entry in the index.
+ *
+ * @param buffer the buffer of this memory mapped index.
+ * @param n the slot
+ * @return the index entry stored in the given slot.
+ */
+ protected abstract IndexEntry parseEntry(ByteBuffer buffer, int n);
+
+ /**
+ * True iff there are no more slots available in this index
+ */
+ public boolean isFull() {
+ return entries >= maxEntries;
+ }
+
+ public File file() {
+ return this.file;
+ }
+
+ public int maxEntries() {
+ return this.maxEntries;
+ }
+
+ public int entries() {
+ return this.entries;
+ }
+
+ public long length() {
+ return this.length;
+ }
+
+ public int maxIndexSize() {
+ return maxIndexSize;
+ }
+
+ public long baseOffset() {
+ return baseOffset;
+ }
+
+ public void updateParentDir(File parentDir) {
+ this.file = new File(parentDir, file.getName());
+ }
+
+ /**
+ * Reset the size of the memory map and the underneath file. This is used in two kinds of cases: (1) in
+ * trimToValidSize() which is called at closing the segment or new segment being rolled; (2) at
+ * loading segments from disk or truncating back to an old segment where a new log segment became active;
+ * we want to reset the index size to maximum index size to avoid rolling new segment.
+ *
+ * @param newSize new size of the index file
+ * @return a boolean indicating whether the size of the memory map and the underneath file is changed or not.
+ */
+ public boolean resize(int newSize) throws IOException {
+ lock.lock();
+ try {
+ int roundedNewSize = roundDownToExactMultiple(newSize, entrySize());
+
+ if (length == roundedNewSize) {
+ log.debug("Index {} was not resized because it already has size {}", file.getAbsolutePath(), roundedNewSize);
+ return false;
+ } else {
+ RandomAccessFile raf = new RandomAccessFile(file, "rw");
+ try {
+ int position = mmap.position();
+
+ /* Windows or z/OS won't let us modify the file length while the file is mmapped :-( */
+ if (OperatingSystem.IS_WINDOWS || OperatingSystem.IS_ZOS)
+ safeForceUnmap();
+ raf.setLength(roundedNewSize);
+ this.length = roundedNewSize;
+ mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize);
+ this.maxEntries = mmap.limit() / entrySize();
+ mmap.position(position);
+ log.debug("Resized {} to {}, position is {} and limit is {}", file.getAbsolutePath(), roundedNewSize,
+ mmap.position(), mmap.limit());
+ return true;
+ } finally {
+ Utils.closeQuietly(raf, "index file " + file.getName());
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Rename the file that backs this offset index
+ *
+ * @throws IOException if rename fails
+ */
+ public void renameTo(File f) throws IOException {
+ try {
+ Utils.atomicMoveWithFallback(file.toPath(), f.toPath(), false);
+ } finally {
+ this.file = f;
+ }
+ }
+
+ /**
+ * Flush the data in the index to disk
+ */
+ public void flush() {
+ lock.lock();
+ try {
+ mmap.force();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Delete this index file.
+ *
+ * @throws IOException if deletion fails due to an I/O error
+ * @return `true` if the file was deleted by this method; `false` if the file could not be deleted because it did
+ * not exist
+ */
+ public boolean deleteIfExists() throws IOException {
+ closeHandler();
+ return Files.deleteIfExists(file.toPath());
+ }
+
+ /**
+ * Trim this segment to fit just the valid entries, deleting all trailing unwritten bytes from
+ * the file.
+ */
+ public void trimToValidSize() throws IOException {
+ lock.lock();
+ try {
+ resize(entrySize() * entries);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * The number of bytes actually used by this index
+ */
+ public int sizeInBytes() {
+ return entrySize() * entries;
+ }
+
+ public void close() throws IOException {
+ trimToValidSize();
+ closeHandler();
+ }
+
+ public void closeHandler() {
+ // On JVM, a memory mapping is typically unmapped by garbage collector.
+ // However, in some cases it can pause application threads(STW) for a long moment reading metadata from a physical disk.
+ // To prevent this, we forcefully cleanup memory mapping within proper execution which never affects API responsiveness.
+ // See https://issues.apache.org/jira/browse/KAFKA-4614 for the details.
+ lock.lock();
+ try {
+ safeForceUnmap();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Remove all the entries from the index and resize the index to the max index size.
+ */
+ public void reset() throws IOException {
+ truncate();
+ resize(maxIndexSize);
+ }
+
+ /**
+ * Get offset relative to base offset of this index
+ * @throws IndexOffsetOverflowException
+ */
+ public int relativeOffset(long offset) {
+ OptionalInt relativeOffset = toRelative(offset);
+ return relativeOffset.orElseThrow(() -> new IndexOffsetOverflowException(
+ "Integer overflow for offset: " + offset + " (" + file.getAbsoluteFile() + ")"));
+ }
+
+ /**
+ * Check if a particular offset is valid to be appended to this index.
+ * @param offset The offset to check
+ * @return true if this offset is valid to be appended to this index; false otherwise
+ */
+ public boolean canAppendOffset(long offset) {
+ return toRelative(offset).isPresent();
+ }
+
+ protected final MappedByteBuffer mmap() {
+ return mmap;
+ }
+
+ /*
+ * Kafka mmaps index files into memory, and all the read / write operations of the index is through OS page cache. This
+ * avoids blocked disk I/O in most cases.
+ *
+ * To the extent of our knowledge, all the modern operating systems use LRU policy or its variants to manage page
+ * cache. Kafka always appends to the end of the index file, and almost all the index lookups (typically from in-sync
+ * followers or consumers) are very close to the end of the index. So, the LRU cache replacement policy should work very
+ * well with Kafka's index access pattern.
+ *
+ * However, when looking up index, the standard binary search algorithm is not cache friendly, and can cause unnecessary
+ * page faults (the thread is blocked to wait for reading some index entries from hard disk, as those entries are not
+ * cached in the page cache).
+ *
+ * For example, in an index with 13 pages, to lookup an entry in the last page (page #12), the standard binary search
+ * algorithm will read index entries in page #0, 6, 9, 11, and 12.
+ * page number: |0|1|2|3|4|5|6|7|8|9|10|11|12 |
+ * steps: |1| | | | | |3| | |4| |5 |2/6|
+ * In each page, there are hundreds log entries, corresponding to hundreds to thousands of kafka messages. When the
+ * index gradually growing from the 1st entry in page #12 to the last entry in page #12, all the write (append)
+ * operations are in page #12, and all the in-sync follower / consumer lookups read page #0,6,9,11,12. As these pages
+ * are always used in each in-sync lookup, we can assume these pages are fairly recently used, and are very likely to be
+ * in the page cache. When the index grows to page #13, the pages needed in a in-sync lookup change to #0, 7, 10, 12,
+ * and 13:
+ * page number: |0|1|2|3|4|5|6|7|8|9|10|11|12|13 |
+ * steps: |1| | | | | | |3| | | 4|5 | 6|2/7|
+ * Page #7 and page #10 have not been used for a very long time. They are much less likely to be in the page cache, than
+ * the other pages. The 1st lookup, after the 1st index entry in page #13 is appended, is likely to have to read page #7
+ * and page #10 from disk (page fault), which can take up to more than a second. In our test, this can cause the
+ * at-least-once produce latency to jump to about 1 second from a few ms.
+ *
+ * Here, we use a more cache-friendly lookup algorithm:
+ * if (target > indexEntry[end - N]) // if the target is in the last N entries of the index
+ * binarySearch(end - N, end)
+ * else
+ * binarySearch(begin, end - N)
+ *
+ * If possible, we only look up in the last N entries of the index. By choosing a proper constant N, all the in-sync
+ * lookups should go to the 1st branch. We call the last N entries the "warm" section. As we frequently look up in this
+ * relatively small section, the pages containing this section are more likely to be in the page cache.
+ *
+ * We set N (_warmEntries) to 8192, because
+ * 1. This number is small enough to guarantee all the pages of the "warm" section is touched in every warm-section
+ * lookup. So that, the entire warm section is really "warm".
+ * When doing warm-section lookup, following 3 entries are always touched: indexEntry(end), indexEntry(end-N),
+ * and indexEntry((end*2 -N)/2). If page size >= 4096, all the warm-section pages (3 or fewer) are touched, when we
+ * touch those 3 entries. As of 2018, 4096 is the smallest page size for all the processors (x86-32, x86-64, MIPS,
+ * SPARC, Power, ARM etc.).
+ * 2. This number is large enough to guarantee most of the in-sync lookups are in the warm-section. With default Kafka
+ * settings, 8KB index corresponds to about 4MB (offset index) or 2.7MB (time index) log messages.
+ *
+ * We can't set make N (_warmEntries) to be larger than 8192, as there is no simple way to guarantee all the "warm"
+ * section pages are really warm (touched in every lookup) on a typical 4KB-page host.
+ *
+ * In there future, we may use a backend thread to periodically touch the entire warm section. So that, we can
+ * 1) support larger warm section
+ * 2) make sure the warm section of low QPS topic-partitions are really warm.
+ */
+ protected final int warmEntries() {
+ return 8192 / entrySize();
+ }
+
+ protected void safeForceUnmap() {
+ if (mmap != null) {
+ try {
+ forceUnmap();
+ } catch (Throwable t) {
+ log.error("Error unmapping index {}", file, t);
+ }
+ }
+ }
+
+ /**
+ * Forcefully free the buffer's mmap.
+ */
+ // Visible for testing, we can make this protected once OffsetIndexTest is in the same package as this class
+ public void forceUnmap() throws IOException {
+ try {
+ ByteBufferUnmapper.unmap(file.getAbsolutePath(), mmap);
+ } finally {
+ mmap = null;
+ }
+ }
+
+ // The caller is expected to hold `lock` when calling this method
+ protected void incrementEntries() {
+ ++entries;
+ }
+
+ protected void truncateToEntries0(int entries) {
+ this.entries = entries;
+ mmap.position(entries * entrySize());
+ }
+
+ /**
+ * Execute the given function in a lock only if we are running on windows or z/OS. We do this
+ * because Windows or z/OS won't let us resize a file while it is mmapped. As a result we have to force unmap it
+ * and this requires synchronizing reads.
+ */
+ protected final T maybeLock(Lock lock, StorageAction action) throws E {
+ if (OperatingSystem.IS_WINDOWS || OperatingSystem.IS_ZOS)
+ lock.lock();
+ try {
+ return action.execute();
+ } finally {
+ if (OperatingSystem.IS_WINDOWS || OperatingSystem.IS_ZOS)
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Find the slot in which the largest entry less than or equal to the given target key or value is stored.
+ * The comparison is made using the `IndexEntry.compareTo()` method.
+ *
+ * @param idx The index buffer
+ * @param target The index key to look for
+ * @return The slot found or -1 if the least entry in the index is larger than the target key or the index is empty
+ */
+ protected int largestLowerBoundSlotFor(ByteBuffer idx, long target, IndexSearchType searchEntity) {
+ return indexSlotRangeFor(idx, target, searchEntity).largestLowerBound;
+ }
+
+ /**
+ * Find the smallest entry greater than or equal the target key or value. If none can be found, -1 is returned.
+ */
+ protected int smallestUpperBoundSlotFor(ByteBuffer idx, long target, IndexSearchType searchEntity) {
+ return indexSlotRangeFor(idx, target, searchEntity).smallestUpperBound;
+ }
+
+ /**
+ * Round a number to the greatest exact multiple of the given factor less than the given number.
+ * E.g. roundDownToExactMultiple(67, 8) == 64
+ */
+ private static int roundDownToExactMultiple(int number, int factor) {
+ return factor * (number / factor);
+ }
+
+ private static MappedByteBuffer createMappedBuffer(RandomAccessFile raf, boolean newlyCreated, long length,
+ boolean writable, int entrySize) throws IOException {
+ MappedByteBuffer idx;
+ if (writable)
+ idx = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, length);
+ else
+ idx = raf.getChannel().map(FileChannel.MapMode.READ_ONLY, 0, length);
+
+ /* set the position in the index for the next entry */
+ if (newlyCreated)
+ idx.position(0);
+ else
+ // if this is a pre-existing index, assume it is valid and set position to last entry
+ idx.position(roundDownToExactMultiple(idx.limit(), entrySize));
+
+ return idx;
+ }
+
+ /**
+ * Lookup lower and upper bounds for the given target.
+ */
+ private BinarySearchResult indexSlotRangeFor(ByteBuffer idx, long target, IndexSearchType searchEntity) {
+ // check if the index is empty
+ if (entries == 0)
+ return new BinarySearchResult(-1, -1);
+
+ int firstHotEntry = Math.max(0, entries - 1 - warmEntries());
+ // check if the target offset is in the warm section of the index
+ if (compareIndexEntry(parseEntry(idx, firstHotEntry), target, searchEntity) < 0) {
+ return binarySearch(idx, target, searchEntity, firstHotEntry, entries - 1);
+ }
+
+ // check if the target offset is smaller than the least offset
+ if (compareIndexEntry(parseEntry(idx, 0), target, searchEntity) > 0)
+ return new BinarySearchResult(-1, 0);
+
+ return binarySearch(idx, target, searchEntity, 0, firstHotEntry);
+ }
+
+ private BinarySearchResult binarySearch(ByteBuffer idx, long target, IndexSearchType searchEntity, int begin, int end) {
+ // binary search for the entry
+ int lo = begin;
+ int hi = end;
+ while (lo < hi) {
+ int mid = (lo + hi + 1) >>> 1;
+ IndexEntry found = parseEntry(idx, mid);
+ int compareResult = compareIndexEntry(found, target, searchEntity);
+ if (compareResult > 0)
+ hi = mid - 1;
+ else if (compareResult < 0)
+ lo = mid;
+ else
+ return new BinarySearchResult(mid, mid);
+ }
+ if (lo == entries - 1)
+ hi = -1;
+ else
+ hi = lo + 1;
+ return new BinarySearchResult(lo, hi);
+ }
+
+ private int compareIndexEntry(IndexEntry indexEntry, long target, IndexSearchType searchEntity) {
+ int result;
+ switch (searchEntity) {
+ case KEY:
+ result = Long.compare(indexEntry.indexKey(), target);
+ break;
+ case VALUE:
+ result = Long.compare(indexEntry.indexValue(), target);
+ break;
+ default:
+ throw new IllegalStateException("Unexpected IndexSearchType: " + searchEntity);
+ }
+ return result;
+ }
+
+ private OptionalInt toRelative(long offset) {
+ long relativeOffset = offset - baseOffset;
+ if (relativeOffset < 0 || relativeOffset > Integer.MAX_VALUE)
+ return OptionalInt.empty();
+ else
+ return OptionalInt.of((int) relativeOffset);
+ }
+
+}
diff --git a/core/src/main/scala/kafka/common/IndexOffsetOverflowException.scala b/storage/src/main/java/org/apache/kafka/server/log/internals/IndexOffsetOverflowException.java
similarity index 61%
rename from core/src/main/scala/kafka/common/IndexOffsetOverflowException.scala
rename to storage/src/main/java/org/apache/kafka/server/log/internals/IndexOffsetOverflowException.java
index 5dd9b43e9e843..aa2c0a590f85b 100644
--- a/core/src/main/scala/kafka/common/IndexOffsetOverflowException.scala
+++ b/storage/src/main/java/org/apache/kafka/server/log/internals/IndexOffsetOverflowException.java
@@ -1,10 +1,10 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
@@ -14,12 +14,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.kafka.server.log.internals;
-package kafka.common
+import org.apache.kafka.common.KafkaException;
/**
* Indicates that an attempt was made to append a message whose offset could cause the index offset to overflow.
*/
-class IndexOffsetOverflowException(message: String, cause: Throwable) extends org.apache.kafka.common.KafkaException(message, cause) {
- def this(message: String) = this(message, null)
+public class IndexOffsetOverflowException extends KafkaException {
+
+ public IndexOffsetOverflowException(String message) {
+ super(message);
+ }
+
+ public IndexOffsetOverflowException(String message, Throwable cause) {
+ super(message, cause);
+ }
}
diff --git a/storage/src/main/java/org/apache/kafka/server/log/internals/IndexSearchType.java b/storage/src/main/java/org/apache/kafka/server/log/internals/IndexSearchType.java
new file mode 100644
index 0000000000000..2a6536466de49
--- /dev/null
+++ b/storage/src/main/java/org/apache/kafka/server/log/internals/IndexSearchType.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+public enum IndexSearchType {
+ KEY, VALUE;
+}