From f4bb513c97031657f3421fa6a17b7dfba7ab5be7 Mon Sep 17 00:00:00 2001 From: Kowshik Prakasam Date: Wed, 12 May 2021 16:52:46 -0700 Subject: [PATCH 1/6] MINOR: Improve Log layer segment iteration logic and few other areas --- core/src/main/scala/kafka/log/Log.scala | 73 +++++++++---------- .../main/scala/kafka/log/LogSegments.scala | 25 +++++-- .../unit/kafka/log/LogSegmentsTest.scala | 2 +- 3 files changed, 55 insertions(+), 45 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 82e083f0c766a..bcd5b4aa4ec6a 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -18,10 +18,8 @@ package kafka.log import java.io.{File, IOException} -import java.lang.{Long => JLong} import java.nio.file.Files import java.text.NumberFormat -import java.util.Map.{Entry => JEntry} import java.util.Optional import java.util.concurrent.atomic._ import java.util.concurrent.TimeUnit @@ -1181,10 +1179,10 @@ class Log(@volatile private var _dir: File, // We create the local variables to avoid race conditions with updates to the log. val endOffsetMetadata = nextOffsetMetadata val endOffset = endOffsetMetadata.messageOffset - var segmentEntryOpt = segments.floorEntry(startOffset) + var segmentOpt = segments.floorSegment(startOffset) // return error on attempt to read beyond the log end offset or read below log start offset - if (startOffset > endOffset || segmentEntryOpt.isEmpty || startOffset < logStartOffset) + if (startOffset > endOffset || segmentOpt.isEmpty || startOffset < logStartOffset) throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, " + s"but we only have log segments in the range $logStartOffset to $endOffset.") @@ -1202,13 +1200,12 @@ class Log(@volatile private var _dir: File, // Do the read on the segment with a base offset less than the target offset // but if that segment doesn't contain any messages with an offset greater than that // continue to read from successive segments until we get some messages or we reach the end of the log - var done = segmentEntryOpt.isEmpty + var done = segmentOpt.isEmpty var fetchDataInfo: FetchDataInfo = null + val segmentsIterator = segmentOpt.map(segment => segments.higherSegments(segment.baseOffset)) + .getOrElse(List()).iterator while (!done) { - val segmentEntry = segmentEntryOpt.get - val baseOffset = segmentEntry.getKey - val segment = segmentEntry.getValue - + val segment = segmentOpt.get val maxPosition = // Use the max offset position if it is on this segment; otherwise, the segment size is the limit. if (maxOffsetMetadata.segmentBaseOffset == segment.baseOffset) maxOffsetMetadata.relativePositionInSegment @@ -1217,10 +1214,10 @@ class Log(@volatile private var _dir: File, fetchDataInfo = segment.read(startOffset, maxLength, maxPosition, minOneMessage) if (fetchDataInfo != null) { if (includeAbortedTxns) - fetchDataInfo = addAbortedTransactions(startOffset, segmentEntry, fetchDataInfo) - } else segmentEntryOpt = segments.higherEntry(baseOffset) + fetchDataInfo = addAbortedTransactions(startOffset, segment, fetchDataInfo) + } else segmentOpt = segmentsIterator.nextOption() - done = fetchDataInfo != null || segmentEntryOpt.isEmpty + done = fetchDataInfo != null || segmentOpt.isEmpty } if (fetchDataInfo != null) fetchDataInfo @@ -1235,25 +1232,27 @@ class Log(@volatile private var _dir: File, } private[log] def collectAbortedTransactions(startOffset: Long, upperBoundOffset: Long): List[AbortedTxn] = { - val segmentEntryOpt = segments.floorEntry(startOffset) + val segmentEntry = segments.floorSegment(startOffset).getOrElse { + throw new IllegalStateException(s"Could not find any segment with a base offset <= the given startOffset:$startOffset") + } val allAbortedTxns = ListBuffer.empty[AbortedTxn] def accumulator(abortedTxns: List[AbortedTxn]): Unit = allAbortedTxns ++= abortedTxns - collectAbortedTransactions(logStartOffset, upperBoundOffset, segmentEntryOpt.get, accumulator) + collectAbortedTransactions(logStartOffset, upperBoundOffset, segmentEntry, accumulator) allAbortedTxns.toList } - private def addAbortedTransactions(startOffset: Long, segmentEntry: JEntry[JLong, LogSegment], + private def addAbortedTransactions(startOffset: Long, segment: LogSegment, fetchInfo: FetchDataInfo): FetchDataInfo = { val fetchSize = fetchInfo.records.sizeInBytes val startOffsetPosition = OffsetPosition(fetchInfo.fetchOffsetMetadata.messageOffset, fetchInfo.fetchOffsetMetadata.relativePositionInSegment) - val upperBoundOffset = segmentEntry.getValue.fetchUpperBoundOffset(startOffsetPosition, fetchSize).getOrElse { - segments.higherSegment(segmentEntry.getKey).map(_.baseOffset).getOrElse(logEndOffset) + val upperBoundOffset = segment.fetchUpperBoundOffset(startOffsetPosition, fetchSize).getOrElse { + segments.higherSegment(segment.baseOffset).map(_.baseOffset).getOrElse(logEndOffset) } val abortedTransactions = ListBuffer.empty[FetchResponseData.AbortedTransaction] def accumulator(abortedTxns: List[AbortedTxn]): Unit = abortedTransactions ++= abortedTxns.map(_.asAbortedTransaction) - collectAbortedTransactions(startOffset, upperBoundOffset, segmentEntry, accumulator) + collectAbortedTransactions(startOffset, upperBoundOffset, segment, accumulator) FetchDataInfo(fetchOffsetMetadata = fetchInfo.fetchOffsetMetadata, records = fetchInfo.records, @@ -1262,17 +1261,17 @@ class Log(@volatile private var _dir: File, } private def collectAbortedTransactions(startOffset: Long, upperBoundOffset: Long, - startingSegmentEntry: JEntry[JLong, LogSegment], + startingSegment: LogSegment, accumulator: List[AbortedTxn] => Unit): Unit = { - var segmentEntryOpt = Option(startingSegmentEntry) + val higherSegments = segments.higherSegments(startingSegment.baseOffset).iterator + var segmentEntryOpt = Option(startingSegment) while (segmentEntryOpt.isDefined) { - val baseOffset = segmentEntryOpt.get.getKey - val segment = segmentEntryOpt.get.getValue + val segment = segmentEntryOpt.get val searchResult = segment.collectAbortedTxns(startOffset, upperBoundOffset) accumulator(searchResult.abortedTransactions) if (searchResult.isComplete) return - segmentEntryOpt = segments.higherEntry(baseOffset) + segmentEntryOpt = higherSegments.nextOption() } } @@ -1438,23 +1437,23 @@ class Log(@volatile private var _dir: File, Seq.empty } else { val deletable = ArrayBuffer.empty[LogSegment] - var segmentEntryOpt = segments.firstEntry - while (segmentEntryOpt.isDefined) { - val segmentEntry = segmentEntryOpt.get - val segment = segmentEntry.getValue - val nextSegmentEntryOpt = segments.higherEntry(segmentEntry.getKey) + val segmentsIterator = segments.values.iterator + var segmentOpt = segmentsIterator.nextOption() + while (segmentOpt.isDefined) { + val segment = segmentOpt.get + val nextSegmentOpt = segmentsIterator.nextOption() val (nextSegment, upperBoundOffset, isLastSegmentAndEmpty) = - nextSegmentEntryOpt.map { - entry => (entry.getValue, entry.getValue.baseOffset, false) + nextSegmentOpt.map { + entry => (entry, entry.baseOffset, false) }.getOrElse { (null, logEndOffset, segment.size == 0) } - if (highWatermark >= upperBoundOffset && predicate(segment, Option(nextSegment)) && !isLastSegmentAndEmpty) { + if (highWatermark >= upperBoundOffset && predicate(segment, nextSegmentOpt) && !isLastSegmentAndEmpty) { deletable += segment - segmentEntryOpt = nextSegmentEntryOpt + segmentOpt = nextSegmentOpt } else { - segmentEntryOpt = Option.empty + segmentOpt = Option.empty } } deletable @@ -2450,11 +2449,11 @@ object Log extends Logging { time: Time, reloadFromCleanShutdown: Boolean, logPrefix: String): Unit = { - val allSegments = segments.values val offsetsToSnapshot = - if (allSegments.nonEmpty) { - val nextLatestSegmentBaseOffset = segments.lowerSegment(allSegments.last.baseOffset).map(_.baseOffset) - Seq(nextLatestSegmentBaseOffset, Some(allSegments.last.baseOffset), Some(lastOffset)) + if (segments.nonEmpty) { + val lastSegmentBaseOffset = segments.lastSegment.get.baseOffset + val nextLatestSegmentBaseOffset = segments.lowerSegment(lastSegmentBaseOffset).map(_.baseOffset) + Seq(nextLatestSegmentBaseOffset, Some(lastSegmentBaseOffset), Some(lastOffset)) } else { Seq(Some(lastOffset)) } diff --git a/core/src/main/scala/kafka/log/LogSegments.scala b/core/src/main/scala/kafka/log/LogSegments.scala index d9e564ed4b13d..d420adc3e7279 100644 --- a/core/src/main/scala/kafka/log/LogSegments.scala +++ b/core/src/main/scala/kafka/log/LogSegments.scala @@ -17,7 +17,6 @@ package kafka.log import java.io.File -import java.lang.{Long => JLong} import java.util.Map import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap} @@ -36,7 +35,7 @@ import scala.jdk.CollectionConverters._ class LogSegments(topicPartition: TopicPartition) { /* the segments of the log with key being LogSegment base offset and value being a LogSegment */ - private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment] + private val segments: ConcurrentNavigableMap[Long, LogSegment] = new ConcurrentSkipListMap[Long, LogSegment] /** * @return true if the segments are empty, false otherwise. @@ -157,7 +156,7 @@ class LogSegments(topicPartition: TopicPartition) { * if it exists. */ @threadsafe - def floorEntry(offset: Long): Option[Map.Entry[JLong, LogSegment]] = Option(segments.floorEntry(offset)) + def floorEntry(offset: Long): Option[Map.Entry[Long, LogSegment]] = Option(segments.floorEntry(offset)) /** * @return the log segment with the greatest offset less than or equal to the given offset, @@ -171,7 +170,7 @@ class LogSegments(topicPartition: TopicPartition) { * if it exists. */ @threadsafe - def lowerEntry(offset: Long): Option[Map.Entry[JLong, LogSegment]] = Option(segments.lowerEntry(offset)) + def lowerEntry(offset: Long): Option[Map.Entry[Long, LogSegment]] = Option(segments.lowerEntry(offset)) /** * @return the log segment with the greatest offset strictly less than the given offset, @@ -185,7 +184,7 @@ class LogSegments(topicPartition: TopicPartition) { * if it exists. */ @threadsafe - def higherEntry(offset: Long): Option[Map.Entry[JLong, LogSegment]] = Option(segments.higherEntry(offset)) + def higherEntry(offset: Long): Option[Map.Entry[Long, LogSegment]] = Option(segments.higherEntry(offset)) /** * @return the log segment with the smallest offset strictly greater than the given offset, @@ -198,7 +197,7 @@ class LogSegments(topicPartition: TopicPartition) { * @return the entry associated with the smallest offset, if it exists. */ @threadsafe - def firstEntry: Option[Map.Entry[JLong, LogSegment]] = Option(segments.firstEntry) + def firstEntry: Option[Map.Entry[Long, LogSegment]] = Option(segments.firstEntry) /** * @return the log segment associated with the smallest offset, if it exists. @@ -210,11 +209,23 @@ class LogSegments(topicPartition: TopicPartition) { * @return the entry associated with the greatest offset, if it exists. */ @threadsafe - def lastEntry: Option[Map.Entry[JLong, LogSegment]] = Option(segments.lastEntry) + def lastEntry: Option[Map.Entry[Long, LogSegment]] = Option(segments.lastEntry) /** * @return the log segment with the greatest offset, if it exists. */ @threadsafe def lastSegment: Option[LogSegment] = lastEntry.map(_.getValue) + + /** + * Returns an iterable to log segments ordered from lowest base offset to highest. + * Each segment in the returned iterable has a base offset strictly greater than the provided baseOffset. + */ + def higherSegments(baseOffset: Long): Iterable[LogSegment] = { + val view = + Option(segments.higherKey(baseOffset)).map { + higherOffset => segments.tailMap(higherOffset, true) + }.getOrElse(new ConcurrentSkipListMap[Long, LogSegment]()) + view.values.asScala + } } diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentsTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentsTest.scala index b929b9c4b8a52..821fa56d7874a 100644 --- a/core/src/test/scala/unit/kafka/log/LogSegmentsTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogSegmentsTest.scala @@ -46,7 +46,7 @@ class LogSegmentsTest { Utils.delete(logDir) } - private def assertEntry(segment: LogSegment, tested: java.util.Map.Entry[java.lang.Long, LogSegment]): Unit = { + private def assertEntry(segment: LogSegment, tested: java.util.Map.Entry[Long, LogSegment]): Unit = { assertEquals(segment.baseOffset, tested.getKey()) assertEquals(segment, tested.getValue()) } From d83704e34fc9e3e957277358fa4b7eab8ca07edd Mon Sep 17 00:00:00 2001 From: Kowshik Prakasam Date: Wed, 12 May 2021 18:17:43 -0700 Subject: [PATCH 2/6] Minor improvement --- core/src/main/scala/kafka/log/Log.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index bcd5b4aa4ec6a..b55f06a73bb73 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -1442,11 +1442,11 @@ class Log(@volatile private var _dir: File, while (segmentOpt.isDefined) { val segment = segmentOpt.get val nextSegmentOpt = segmentsIterator.nextOption() - val (nextSegment, upperBoundOffset, isLastSegmentAndEmpty) = + val (upperBoundOffset, isLastSegmentAndEmpty) = nextSegmentOpt.map { - entry => (entry, entry.baseOffset, false) + nextSegment => (nextSegment.baseOffset, false) }.getOrElse { - (null, logEndOffset, segment.size == 0) + (logEndOffset, segment.size == 0) } if (highWatermark >= upperBoundOffset && predicate(segment, nextSegmentOpt) && !isLastSegmentAndEmpty) { From d9b4eb0010fd7d560340696f8e2e033e469b134e Mon Sep 17 00:00:00 2001 From: Kowshik Prakasam Date: Wed, 12 May 2021 23:15:28 -0700 Subject: [PATCH 3/6] Address comments --- core/src/main/scala/kafka/log/Log.scala | 5 +---- core/src/main/scala/kafka/log/LogSegments.scala | 8 ++++---- core/src/test/scala/unit/kafka/log/LogSegmentsTest.scala | 8 ++------ 3 files changed, 7 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index b55f06a73bb73..f08c099787885 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -1200,11 +1200,10 @@ class Log(@volatile private var _dir: File, // Do the read on the segment with a base offset less than the target offset // but if that segment doesn't contain any messages with an offset greater than that // continue to read from successive segments until we get some messages or we reach the end of the log - var done = segmentOpt.isEmpty var fetchDataInfo: FetchDataInfo = null val segmentsIterator = segmentOpt.map(segment => segments.higherSegments(segment.baseOffset)) .getOrElse(List()).iterator - while (!done) { + while (fetchDataInfo == null && segmentOpt.isDefined) { val segment = segmentOpt.get val maxPosition = // Use the max offset position if it is on this segment; otherwise, the segment size is the limit. @@ -1216,8 +1215,6 @@ class Log(@volatile private var _dir: File, if (includeAbortedTxns) fetchDataInfo = addAbortedTransactions(startOffset, segment, fetchDataInfo) } else segmentOpt = segmentsIterator.nextOption() - - done = fetchDataInfo != null || segmentOpt.isEmpty } if (fetchDataInfo != null) fetchDataInfo diff --git a/core/src/main/scala/kafka/log/LogSegments.scala b/core/src/main/scala/kafka/log/LogSegments.scala index d420adc3e7279..76d62ec741784 100644 --- a/core/src/main/scala/kafka/log/LogSegments.scala +++ b/core/src/main/scala/kafka/log/LogSegments.scala @@ -156,7 +156,7 @@ class LogSegments(topicPartition: TopicPartition) { * if it exists. */ @threadsafe - def floorEntry(offset: Long): Option[Map.Entry[Long, LogSegment]] = Option(segments.floorEntry(offset)) + private def floorEntry(offset: Long): Option[Map.Entry[Long, LogSegment]] = Option(segments.floorEntry(offset)) /** * @return the log segment with the greatest offset less than or equal to the given offset, @@ -170,7 +170,7 @@ class LogSegments(topicPartition: TopicPartition) { * if it exists. */ @threadsafe - def lowerEntry(offset: Long): Option[Map.Entry[Long, LogSegment]] = Option(segments.lowerEntry(offset)) + private def lowerEntry(offset: Long): Option[Map.Entry[Long, LogSegment]] = Option(segments.lowerEntry(offset)) /** * @return the log segment with the greatest offset strictly less than the given offset, @@ -218,8 +218,8 @@ class LogSegments(topicPartition: TopicPartition) { def lastSegment: Option[LogSegment] = lastEntry.map(_.getValue) /** - * Returns an iterable to log segments ordered from lowest base offset to highest. - * Each segment in the returned iterable has a base offset strictly greater than the provided baseOffset. + * @return an iterable with log segments ordered from lowest base offset to highest, + * each segment returned has a base offset strictly greater than the provided baseOffset. */ def higherSegments(baseOffset: Long): Iterable[LogSegment] = { val view = diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentsTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentsTest.scala index 821fa56d7874a..05380893f46a0 100644 --- a/core/src/test/scala/unit/kafka/log/LogSegmentsTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogSegmentsTest.scala @@ -158,17 +158,13 @@ class LogSegmentsTest { List(seg1, seg2, seg3, seg4).foreach(segments.add) - // Test floorSegment, floorEntry + // Test floorSegment assertEquals(Some(seg1), segments.floorSegment(2)) - assertEntry(seg1, segments.floorEntry(2).get) assertEquals(Some(seg2), segments.floorSegment(3)) - assertEntry(seg2, segments.floorEntry(3).get) - // Test lowerSegment, lowerEntry + // Test lowerSegment assertEquals(Some(seg1), segments.lowerSegment(3)) - assertEntry(seg1, segments.lowerEntry(3).get) assertEquals(Some(seg2), segments.lowerSegment(4)) - assertEntry(seg2, segments.lowerEntry(4).get) // Test higherSegment, higherEntry assertEquals(Some(seg3), segments.higherSegment(4)) From ff75dfe45e155f1acd5d386755c8a0c1675a70b3 Mon Sep 17 00:00:00 2001 From: Kowshik Prakasam Date: Sat, 15 May 2021 13:51:35 -0700 Subject: [PATCH 4/6] Fix inconsistent Option/null check in Log.collectAbortedTransactions --- core/src/main/scala/kafka/log/Log.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index f08c099787885..6d794fb27054e 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -1229,12 +1229,10 @@ class Log(@volatile private var _dir: File, } private[log] def collectAbortedTransactions(startOffset: Long, upperBoundOffset: Long): List[AbortedTxn] = { - val segmentEntry = segments.floorSegment(startOffset).getOrElse { - throw new IllegalStateException(s"Could not find any segment with a base offset <= the given startOffset:$startOffset") - } + val segmentEntry = segments.floorSegment(startOffset) val allAbortedTxns = ListBuffer.empty[AbortedTxn] def accumulator(abortedTxns: List[AbortedTxn]): Unit = allAbortedTxns ++= abortedTxns - collectAbortedTransactions(logStartOffset, upperBoundOffset, segmentEntry, accumulator) + segmentEntry.foreach(segment => collectAbortedTransactions(logStartOffset, upperBoundOffset, segment, accumulator)) allAbortedTxns.toList } From c0e8307ff90c350c2f2a2c198d5c222497a21b91 Mon Sep 17 00:00:00 2001 From: Kowshik Prakasam Date: Tue, 25 May 2021 00:15:20 -0700 Subject: [PATCH 5/6] Address comments from Jun --- core/src/main/scala/kafka/log/Log.scala | 6 +++--- core/src/main/scala/kafka/log/LogSegments.scala | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 6d794fb27054e..579d6b47c414b 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -1201,10 +1201,10 @@ class Log(@volatile private var _dir: File, // but if that segment doesn't contain any messages with an offset greater than that // continue to read from successive segments until we get some messages or we reach the end of the log var fetchDataInfo: FetchDataInfo = null - val segmentsIterator = segmentOpt.map(segment => segments.higherSegments(segment.baseOffset)) - .getOrElse(List()).iterator while (fetchDataInfo == null && segmentOpt.isDefined) { val segment = segmentOpt.get + val baseOffset = segment.baseOffset + val maxPosition = // Use the max offset position if it is on this segment; otherwise, the segment size is the limit. if (maxOffsetMetadata.segmentBaseOffset == segment.baseOffset) maxOffsetMetadata.relativePositionInSegment @@ -1214,7 +1214,7 @@ class Log(@volatile private var _dir: File, if (fetchDataInfo != null) { if (includeAbortedTxns) fetchDataInfo = addAbortedTransactions(startOffset, segment, fetchDataInfo) - } else segmentOpt = segmentsIterator.nextOption() + } else segmentOpt = segments.higherSegment(baseOffset) } if (fetchDataInfo != null) fetchDataInfo diff --git a/core/src/main/scala/kafka/log/LogSegments.scala b/core/src/main/scala/kafka/log/LogSegments.scala index 76d62ec741784..d6886d7fdef13 100644 --- a/core/src/main/scala/kafka/log/LogSegments.scala +++ b/core/src/main/scala/kafka/log/LogSegments.scala @@ -219,13 +219,13 @@ class LogSegments(topicPartition: TopicPartition) { /** * @return an iterable with log segments ordered from lowest base offset to highest, - * each segment returned has a base offset strictly greater than the provided baseOffset. + * each segment returned has a base offset strictly greater than the provided baseOffset. */ def higherSegments(baseOffset: Long): Iterable[LogSegment] = { val view = Option(segments.higherKey(baseOffset)).map { higherOffset => segments.tailMap(higherOffset, true) - }.getOrElse(new ConcurrentSkipListMap[Long, LogSegment]()) + }.getOrElse(collection.immutable.Map[Long, LogSegment]().asJava) view.values.asScala } } From 7a84aa15a943d72ea5e100af4d7406fca4d23fc6 Mon Sep 17 00:00:00 2001 From: Kowshik Prakasam Date: Wed, 26 May 2021 19:07:53 -0700 Subject: [PATCH 6/6] Fix build in scala 2.12 and add unit tests --- core/src/main/scala/kafka/log/Log.scala | 23 +++++++-- .../unit/kafka/log/LogSegmentsTest.scala | 49 +++++++++++++++++++ 2 files changed, 68 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 579d6b47c414b..fe60c42faa772 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -1266,7 +1266,7 @@ class Log(@volatile private var _dir: File, accumulator(searchResult.abortedTransactions) if (searchResult.isComplete) return - segmentEntryOpt = higherSegments.nextOption() + segmentEntryOpt = nextOption(higherSegments) } } @@ -1433,11 +1433,11 @@ class Log(@volatile private var _dir: File, } else { val deletable = ArrayBuffer.empty[LogSegment] val segmentsIterator = segments.values.iterator - var segmentOpt = segmentsIterator.nextOption() + var segmentOpt = nextOption(segmentsIterator) while (segmentOpt.isDefined) { val segment = segmentOpt.get - val nextSegmentOpt = segmentsIterator.nextOption() - val (upperBoundOffset, isLastSegmentAndEmpty) = + val nextSegmentOpt = nextOption(segmentsIterator) + val (upperBoundOffset: Long, isLastSegmentAndEmpty: Boolean) = nextSegmentOpt.map { nextSegment => (nextSegment.baseOffset, false) }.getOrElse { @@ -2603,6 +2603,21 @@ object Log extends Logging { throw e } } + + /** + * Wraps the value of iterator.next() in an option. + * Note: this facility is a part of the Iterator class starting from scala v2.13. + * + * @param iterator + * @tparam T the type of object held within the iterator + * @return Some(iterator.next) if a next element exists, None otherwise. + */ + private def nextOption[T](iterator: Iterator[T]): Option[T] = { + if (iterator.hasNext) + Some(iterator.next()) + else + None + } } object LogMetricNames { diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentsTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentsTest.scala index 05380893f46a0..9d0765aed687e 100644 --- a/core/src/test/scala/unit/kafka/log/LogSegmentsTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogSegmentsTest.scala @@ -174,4 +174,53 @@ class LogSegmentsTest { segments.close() } + + @Test + def testHigherSegments(): Unit = { + val segments = new LogSegments(topicPartition) + + val seg1 = createSegment(1) + val seg2 = createSegment(3) + val seg3 = createSegment(5) + val seg4 = createSegment(7) + val seg5 = createSegment(9) + + List(seg1, seg2, seg3, seg4, seg5).foreach(segments.add) + + // higherSegments(0) should return all segments in order + { + val iterator = segments.higherSegments(0).iterator + List(seg1, seg2, seg3, seg4, seg5).foreach { + segment => + assertTrue(iterator.hasNext) + assertEquals(segment, iterator.next()) + } + assertFalse(iterator.hasNext) + } + + // higherSegments(1) should return all segments in order except seg1 + { + val iterator = segments.higherSegments(1).iterator + List(seg2, seg3, seg4, seg5).foreach { + segment => + assertTrue(iterator.hasNext) + assertEquals(segment, iterator.next()) + } + assertFalse(iterator.hasNext) + } + + // higherSegments(8) should return only seg5 + { + val iterator = segments.higherSegments(8).iterator + assertTrue(iterator.hasNext) + assertEquals(seg5, iterator.next()) + assertFalse(iterator.hasNext) + } + + // higherSegments(9) should return no segments + { + val iterator = segments.higherSegments(9).iterator + assertFalse(iterator.hasNext) + } + } }