Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 49 additions & 40 deletions core/src/main/scala/kafka/log/Log.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@
package kafka.log

import java.io.{File, IOException}
import java.lang.{Long => JLong}
import java.nio.file.Files
import java.text.NumberFormat
import java.util.Map.{Entry => JEntry}
import java.util.Optional
import java.util.concurrent.atomic._
import java.util.concurrent.TimeUnit
Expand Down Expand Up @@ -1181,10 +1179,10 @@ class Log(@volatile private var _dir: File,
// We create the local variables to avoid race conditions with updates to the log.
val endOffsetMetadata = nextOffsetMetadata
val endOffset = endOffsetMetadata.messageOffset
var segmentEntryOpt = segments.floorEntry(startOffset)
var segmentOpt = segments.floorSegment(startOffset)

// return error on attempt to read beyond the log end offset or read below log start offset
if (startOffset > endOffset || segmentEntryOpt.isEmpty || startOffset < logStartOffset)
if (startOffset > endOffset || segmentOpt.isEmpty || startOffset < logStartOffset)
throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, " +
s"but we only have log segments in the range $logStartOffset to $endOffset.")

Expand All @@ -1202,12 +1200,10 @@ class Log(@volatile private var _dir: File,
// Do the read on the segment with a base offset less than the target offset
// but if that segment doesn't contain any messages with an offset greater than that
// continue to read from successive segments until we get some messages or we reach the end of the log
var done = segmentEntryOpt.isEmpty
var fetchDataInfo: FetchDataInfo = null
while (!done) {
val segmentEntry = segmentEntryOpt.get
val baseOffset = segmentEntry.getKey
val segment = segmentEntry.getValue
while (fetchDataInfo == null && segmentOpt.isDefined) {
val segment = segmentOpt.get
val baseOffset = segment.baseOffset

val maxPosition =
// Use the max offset position if it is on this segment; otherwise, the segment size is the limit.
Expand All @@ -1217,10 +1213,8 @@ class Log(@volatile private var _dir: File,
fetchDataInfo = segment.read(startOffset, maxLength, maxPosition, minOneMessage)
if (fetchDataInfo != null) {
if (includeAbortedTxns)
fetchDataInfo = addAbortedTransactions(startOffset, segmentEntry, fetchDataInfo)
} else segmentEntryOpt = segments.higherEntry(baseOffset)

done = fetchDataInfo != null || segmentEntryOpt.isEmpty
fetchDataInfo = addAbortedTransactions(startOffset, segment, fetchDataInfo)
} else segmentOpt = segments.higherSegment(baseOffset)
}

if (fetchDataInfo != null) fetchDataInfo
Expand All @@ -1235,25 +1229,25 @@ class Log(@volatile private var _dir: File,
}

private[log] def collectAbortedTransactions(startOffset: Long, upperBoundOffset: Long): List[AbortedTxn] = {
val segmentEntryOpt = segments.floorEntry(startOffset)
val segmentEntry = segments.floorSegment(startOffset)
val allAbortedTxns = ListBuffer.empty[AbortedTxn]
def accumulator(abortedTxns: List[AbortedTxn]): Unit = allAbortedTxns ++= abortedTxns
collectAbortedTransactions(logStartOffset, upperBoundOffset, segmentEntryOpt.get, accumulator)
segmentEntry.foreach(segment => collectAbortedTransactions(logStartOffset, upperBoundOffset, segment, accumulator))
allAbortedTxns.toList
}

private def addAbortedTransactions(startOffset: Long, segmentEntry: JEntry[JLong, LogSegment],
private def addAbortedTransactions(startOffset: Long, segment: LogSegment,
fetchInfo: FetchDataInfo): FetchDataInfo = {
val fetchSize = fetchInfo.records.sizeInBytes
val startOffsetPosition = OffsetPosition(fetchInfo.fetchOffsetMetadata.messageOffset,
fetchInfo.fetchOffsetMetadata.relativePositionInSegment)
val upperBoundOffset = segmentEntry.getValue.fetchUpperBoundOffset(startOffsetPosition, fetchSize).getOrElse {
segments.higherSegment(segmentEntry.getKey).map(_.baseOffset).getOrElse(logEndOffset)
val upperBoundOffset = segment.fetchUpperBoundOffset(startOffsetPosition, fetchSize).getOrElse {
segments.higherSegment(segment.baseOffset).map(_.baseOffset).getOrElse(logEndOffset)
}

val abortedTransactions = ListBuffer.empty[FetchResponseData.AbortedTransaction]
def accumulator(abortedTxns: List[AbortedTxn]): Unit = abortedTransactions ++= abortedTxns.map(_.asAbortedTransaction)
collectAbortedTransactions(startOffset, upperBoundOffset, segmentEntry, accumulator)
collectAbortedTransactions(startOffset, upperBoundOffset, segment, accumulator)

FetchDataInfo(fetchOffsetMetadata = fetchInfo.fetchOffsetMetadata,
records = fetchInfo.records,
Expand All @@ -1262,17 +1256,17 @@ class Log(@volatile private var _dir: File,
}

private def collectAbortedTransactions(startOffset: Long, upperBoundOffset: Long,
startingSegmentEntry: JEntry[JLong, LogSegment],
startingSegment: LogSegment,
accumulator: List[AbortedTxn] => Unit): Unit = {
var segmentEntryOpt = Option(startingSegmentEntry)
val higherSegments = segments.higherSegments(startingSegment.baseOffset).iterator
var segmentEntryOpt = Option(startingSegment)
while (segmentEntryOpt.isDefined) {
val baseOffset = segmentEntryOpt.get.getKey
val segment = segmentEntryOpt.get.getValue
val segment = segmentEntryOpt.get
val searchResult = segment.collectAbortedTxns(startOffset, upperBoundOffset)
accumulator(searchResult.abortedTransactions)
if (searchResult.isComplete)
return
segmentEntryOpt = segments.higherEntry(baseOffset)
segmentEntryOpt = nextOption(higherSegments)
}
}

Expand Down Expand Up @@ -1438,23 +1432,23 @@ class Log(@volatile private var _dir: File,
Seq.empty
} else {
val deletable = ArrayBuffer.empty[LogSegment]
var segmentEntryOpt = segments.firstEntry
while (segmentEntryOpt.isDefined) {
val segmentEntry = segmentEntryOpt.get
val segment = segmentEntry.getValue
val nextSegmentEntryOpt = segments.higherEntry(segmentEntry.getKey)
val (nextSegment, upperBoundOffset, isLastSegmentAndEmpty) =
nextSegmentEntryOpt.map {
entry => (entry.getValue, entry.getValue.baseOffset, false)
val segmentsIterator = segments.values.iterator
var segmentOpt = nextOption(segmentsIterator)
while (segmentOpt.isDefined) {
val segment = segmentOpt.get
val nextSegmentOpt = nextOption(segmentsIterator)
val (upperBoundOffset: Long, isLastSegmentAndEmpty: Boolean) =
nextSegmentOpt.map {
nextSegment => (nextSegment.baseOffset, false)
}.getOrElse {
(null, logEndOffset, segment.size == 0)
(logEndOffset, segment.size == 0)
}

if (highWatermark >= upperBoundOffset && predicate(segment, Option(nextSegment)) && !isLastSegmentAndEmpty) {
if (highWatermark >= upperBoundOffset && predicate(segment, nextSegmentOpt) && !isLastSegmentAndEmpty) {
deletable += segment
segmentEntryOpt = nextSegmentEntryOpt
segmentOpt = nextSegmentOpt
} else {
segmentEntryOpt = Option.empty
segmentOpt = Option.empty
}
}
deletable
Expand Down Expand Up @@ -2450,11 +2444,11 @@ object Log extends Logging {
time: Time,
reloadFromCleanShutdown: Boolean,
logPrefix: String): Unit = {
val allSegments = segments.values
val offsetsToSnapshot =
if (allSegments.nonEmpty) {
val nextLatestSegmentBaseOffset = segments.lowerSegment(allSegments.last.baseOffset).map(_.baseOffset)
Seq(nextLatestSegmentBaseOffset, Some(allSegments.last.baseOffset), Some(lastOffset))
if (segments.nonEmpty) {
val lastSegmentBaseOffset = segments.lastSegment.get.baseOffset
val nextLatestSegmentBaseOffset = segments.lowerSegment(lastSegmentBaseOffset).map(_.baseOffset)
Seq(nextLatestSegmentBaseOffset, Some(lastSegmentBaseOffset), Some(lastOffset))
} else {
Seq(Some(lastOffset))
}
Expand Down Expand Up @@ -2609,6 +2603,21 @@ object Log extends Logging {
throw e
}
}

/**
* Wraps the value of iterator.next() in an option.
* Note: this facility is a part of the Iterator class starting from scala v2.13.
*
* @param iterator
* @tparam T the type of object held within the iterator
* @return Some(iterator.next) if a next element exists, None otherwise.
*/
private def nextOption[T](iterator: Iterator[T]): Option[T] = {
if (iterator.hasNext)
Some(iterator.next())
else
None
}
}

object LogMetricNames {
Expand Down
25 changes: 18 additions & 7 deletions core/src/main/scala/kafka/log/LogSegments.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package kafka.log

import java.io.File
import java.lang.{Long => JLong}
Comment thread
kowshik marked this conversation as resolved.
Outdated
import java.util.Map
import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap}

Expand All @@ -36,7 +35,7 @@ import scala.jdk.CollectionConverters._
class LogSegments(topicPartition: TopicPartition) {

/* the segments of the log with key being LogSegment base offset and value being a LogSegment */
private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]
private val segments: ConcurrentNavigableMap[Long, LogSegment] = new ConcurrentSkipListMap[Long, LogSegment]

/**
* @return true if the segments are empty, false otherwise.
Expand Down Expand Up @@ -157,7 +156,7 @@ class LogSegments(topicPartition: TopicPartition) {
* if it exists.
*/
@threadsafe
def floorEntry(offset: Long): Option[Map.Entry[JLong, LogSegment]] = Option(segments.floorEntry(offset))
private def floorEntry(offset: Long): Option[Map.Entry[Long, LogSegment]] = Option(segments.floorEntry(offset))

/**
* @return the log segment with the greatest offset less than or equal to the given offset,
Expand All @@ -171,7 +170,7 @@ class LogSegments(topicPartition: TopicPartition) {
* if it exists.
*/
@threadsafe
def lowerEntry(offset: Long): Option[Map.Entry[JLong, LogSegment]] = Option(segments.lowerEntry(offset))
private def lowerEntry(offset: Long): Option[Map.Entry[Long, LogSegment]] = Option(segments.lowerEntry(offset))

/**
* @return the log segment with the greatest offset strictly less than the given offset,
Expand All @@ -185,7 +184,7 @@ class LogSegments(topicPartition: TopicPartition) {
* if it exists.
*/
@threadsafe
def higherEntry(offset: Long): Option[Map.Entry[JLong, LogSegment]] = Option(segments.higherEntry(offset))
def higherEntry(offset: Long): Option[Map.Entry[Long, LogSegment]] = Option(segments.higherEntry(offset))
Comment thread
kowshik marked this conversation as resolved.
Outdated

/**
* @return the log segment with the smallest offset strictly greater than the given offset,
Expand All @@ -198,7 +197,7 @@ class LogSegments(topicPartition: TopicPartition) {
* @return the entry associated with the smallest offset, if it exists.
*/
@threadsafe
def firstEntry: Option[Map.Entry[JLong, LogSegment]] = Option(segments.firstEntry)
def firstEntry: Option[Map.Entry[Long, LogSegment]] = Option(segments.firstEntry)

/**
* @return the log segment associated with the smallest offset, if it exists.
Expand All @@ -210,11 +209,23 @@ class LogSegments(topicPartition: TopicPartition) {
* @return the entry associated with the greatest offset, if it exists.
*/
@threadsafe
def lastEntry: Option[Map.Entry[JLong, LogSegment]] = Option(segments.lastEntry)
def lastEntry: Option[Map.Entry[Long, LogSegment]] = Option(segments.lastEntry)

/**
* @return the log segment with the greatest offset, if it exists.
*/
@threadsafe
def lastSegment: Option[LogSegment] = lastEntry.map(_.getValue)

/**
* @return an iterable with log segments ordered from lowest base offset to highest,
* each segment returned has a base offset strictly greater than the provided baseOffset.
*/
def higherSegments(baseOffset: Long): Iterable[LogSegment] = {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ijuma If you think this API is a good fit to the requirement, I can add unit tests for it.

val view =
Option(segments.higherKey(baseOffset)).map {
higherOffset => segments.tailMap(higherOffset, true)
}.getOrElse(collection.immutable.Map[Long, LogSegment]().asJava)
view.values.asScala
}
}
59 changes: 52 additions & 7 deletions core/src/test/scala/unit/kafka/log/LogSegmentsTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class LogSegmentsTest {
Utils.delete(logDir)
}

private def assertEntry(segment: LogSegment, tested: java.util.Map.Entry[java.lang.Long, LogSegment]): Unit = {
private def assertEntry(segment: LogSegment, tested: java.util.Map.Entry[Long, LogSegment]): Unit = {
assertEquals(segment.baseOffset, tested.getKey())
assertEquals(segment, tested.getValue())
}
Expand Down Expand Up @@ -158,17 +158,13 @@ class LogSegmentsTest {

List(seg1, seg2, seg3, seg4).foreach(segments.add)

// Test floorSegment, floorEntry
// Test floorSegment
assertEquals(Some(seg1), segments.floorSegment(2))
assertEntry(seg1, segments.floorEntry(2).get)
assertEquals(Some(seg2), segments.floorSegment(3))
assertEntry(seg2, segments.floorEntry(3).get)

// Test lowerSegment, lowerEntry
// Test lowerSegment
assertEquals(Some(seg1), segments.lowerSegment(3))
assertEntry(seg1, segments.lowerEntry(3).get)
assertEquals(Some(seg2), segments.lowerSegment(4))
assertEntry(seg2, segments.lowerEntry(4).get)

// Test higherSegment, higherEntry
assertEquals(Some(seg3), segments.higherSegment(4))
Expand All @@ -178,4 +174,53 @@ class LogSegmentsTest {

segments.close()
}

@Test
def testHigherSegments(): Unit = {
val segments = new LogSegments(topicPartition)

val seg1 = createSegment(1)
val seg2 = createSegment(3)
val seg3 = createSegment(5)
val seg4 = createSegment(7)
val seg5 = createSegment(9)

List(seg1, seg2, seg3, seg4, seg5).foreach(segments.add)

// higherSegments(0) should return all segments in order
{
val iterator = segments.higherSegments(0).iterator
List(seg1, seg2, seg3, seg4, seg5).foreach {
segment =>
assertTrue(iterator.hasNext)
assertEquals(segment, iterator.next())
}
assertFalse(iterator.hasNext)
}

// higherSegments(1) should return all segments in order except seg1
{
val iterator = segments.higherSegments(1).iterator
List(seg2, seg3, seg4, seg5).foreach {
segment =>
assertTrue(iterator.hasNext)
assertEquals(segment, iterator.next())
}
assertFalse(iterator.hasNext)
}

// higherSegments(8) should return only seg5
{
val iterator = segments.higherSegments(8).iterator
assertTrue(iterator.hasNext)
assertEquals(seg5, iterator.next())
assertFalse(iterator.hasNext)
}

// higherSegments(9) should return no segments
{
val iterator = segments.higherSegments(9).iterator
assertFalse(iterator.hasNext)
}
}
}