From db86809cfdab52f16569aa32e76942b6bcfe63b5 Mon Sep 17 00:00:00 2001 From: Kowshik Prakasam Date: Wed, 24 Mar 2021 17:57:39 -0700 Subject: [PATCH 1/4] Initial commit --- core/src/main/scala/kafka/log/Log.scala | 152 +++++------- .../main/scala/kafka/log/LogSegments.scala | 225 ++++++++++++++++++ .../unit/kafka/log/LogSegmentsTest.scala | 164 +++++++++++++ 3 files changed, 452 insertions(+), 89 deletions(-) create mode 100644 core/src/main/scala/kafka/log/LogSegments.scala create mode 100644 core/src/test/scala/unit/kafka/log/LogSegmentsTest.scala diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 0ad82f46b9d4b..8f34032f8794f 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -24,7 +24,7 @@ import java.text.NumberFormat import java.util.Map.{Entry => JEntry} import java.util.Optional import java.util.concurrent.atomic._ -import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap, TimeUnit} +import java.util.concurrent.TimeUnit import java.util.regex.Pattern import kafka.api.{ApiVersion, KAFKA_0_10_0_IV0} @@ -308,7 +308,7 @@ class Log(@volatile private var _dir: File, @volatile private var highWatermarkMetadata: LogOffsetMetadata = LogOffsetMetadata(logStartOffset) /* the actual segments of the log */ - private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment] + private val segments: LogSegments = new LogSegments(topicPartition) // Visible for testing @volatile var leaderEpochCache: Option[LeaderEpochFileCache] = None @@ -331,7 +331,7 @@ class Log(@volatile private var _dir: File, leaderEpochCache.foreach(_.truncateFromEnd(nextOffsetMetadata.messageOffset)) - updateLogStartOffset(math.max(logStartOffset, segments.firstEntry.getValue.baseOffset)) + updateLogStartOffset(math.max(logStartOffset, segments.firstSegment.get.baseOffset)) // The earliest leader epoch may not be flushed during a hard failure. Recover it here. leaderEpochCache.foreach(_.truncateFromStart(logStartOffset)) @@ -344,7 +344,7 @@ class Log(@volatile private var _dir: File, // Reload all snapshots into the ProducerStateManager cache, the intermediate ProducerStateManager used // during log recovery may have deleted some files without the Log.producerStateManager instance witnessing the // deletion. - producerStateManager.removeStraySnapshots(segments.values().asScala.map(_.baseOffset).toSeq) + producerStateManager.removeStraySnapshots(segments.baseOffsets) loadProducerState(logEndOffset, reloadFromCleanShutdown = hadCleanShutdown) // Delete partition metadata file if the version does not support topic IDs. @@ -785,7 +785,7 @@ class Log(@volatile private var _dir: File, // In case we encounter a segment with offset overflow, the retry logic will split it after which we need to retry // loading of segments. In that case, we also need to close all segments that could have been left open in previous // call to loadSegmentFiles(). - logSegments.foreach(_.close()) + segments.close() segments.clear() loadSegmentFiles() } @@ -850,7 +850,7 @@ class Log(@volatile private var _dir: File, private[log] def recoverLog(): Long = { /** return the log end offset if valid */ def deleteSegmentsIfLogStartGreaterThanLogEnd(): Option[Long] = { - if (logSegments.nonEmpty) { + if (segments.nonEmpty) { val logEndOffset = activeSegment.readNextOffset if (logEndOffset >= logStartOffset) Some(logEndOffset) @@ -929,11 +929,11 @@ class Log(@volatile private var _dir: File, reloadFromCleanShutdown: Boolean, producerStateManager: ProducerStateManager): Unit = lock synchronized { checkIfMemoryMappedBufferClosed() - val segments = logSegments + val allSegments = logSegments val offsetsToSnapshot = - if (segments.nonEmpty) { - val nextLatestSegmentBaseOffset = lowerSegment(segments.last.baseOffset).map(_.baseOffset) - Seq(nextLatestSegmentBaseOffset, Some(segments.last.baseOffset), Some(lastOffset)) + if (allSegments.nonEmpty) { + val nextLatestSegmentBaseOffset = segments.lowerSegment(allSegments.last.baseOffset).map(_.baseOffset) + Seq(nextLatestSegmentBaseOffset, Some(allSegments.last.baseOffset), Some(lastOffset)) } else { Seq(Some(lastOffset)) } @@ -974,7 +974,7 @@ class Log(@volatile private var _dir: File, // and we can skip the loading. This is an optimization for users which are not yet using // idempotent/transactional features yet. if (lastOffset > producerStateManager.mapEndOffset && !isEmptyBeforeTruncation) { - val segmentOfLastOffset = floorLogSegment(lastOffset) + val segmentOfLastOffset = segments.floorSegment(lastOffset) logSegments(producerStateManager.mapEndOffset, lastOffset).foreach { segment => val startOffset = Utils.max(segment.baseOffset, producerStateManager.mapEndOffset, logStartOffset) @@ -1043,7 +1043,7 @@ class Log(@volatile private var _dir: File, * The number of segments in the log. * Take care! this is an O(n) operation. */ - def numberOfSegments: Int = segments.size + def numberOfSegments: Int = segments.numberOfSegments /** * Close this log. @@ -1059,7 +1059,7 @@ class Log(@volatile private var _dir: File, // after restarting and to ensure that we cannot inadvertently hit the upgrade optimization // (the clean shutdown file is written after the logs are all closed). producerStateManager.takeSnapshot() - logSegments.foreach(_.close()) + segments.close() } } } @@ -1077,7 +1077,7 @@ class Log(@volatile private var _dir: File, if (renamedDir != dir) { _dir = renamedDir _parentDir = renamedDir.getParent - logSegments.foreach(_.updateParentDir(renamedDir)) + segments.updateParentDir(renamedDir) producerStateManager.updateParentDir(dir) // re-initialize leader epoch cache so that LeaderEpochCheckpointFile.checkpoint can correctly reference // the checkpoint file in renamed log directory @@ -1094,7 +1094,7 @@ class Log(@volatile private var _dir: File, def closeHandlers(): Unit = { debug("Closing handlers") lock synchronized { - logSegments.foreach(_.closeHandlers()) + segments.closeHandlers() isMemoryMappedBufferClosed = true } } @@ -1603,10 +1603,10 @@ class Log(@volatile private var _dir: File, // We create the local variables to avoid race conditions with updates to the log. val endOffsetMetadata = nextOffsetMetadata val endOffset = endOffsetMetadata.messageOffset - var segmentEntry = segments.floorEntry(startOffset) + var segmentEntryOpt = segments.floorEntry(startOffset) // return error on attempt to read beyond the log end offset or read below log start offset - if (startOffset > endOffset || segmentEntry == null || startOffset < logStartOffset) + if (startOffset > endOffset || segmentEntryOpt.isEmpty || startOffset < logStartOffset) throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, " + s"but we only have log segments in the range $logStartOffset to $endOffset.") @@ -1624,9 +1624,11 @@ class Log(@volatile private var _dir: File, // Do the read on the segment with a base offset less than the target offset // but if that segment doesn't contain any messages with an offset greater than that // continue to read from successive segments until we get some messages or we reach the end of the log - var done = segmentEntry == null + var done = segmentEntryOpt.isEmpty var fetchDataInfo: FetchDataInfo = null while (!done) { + val segmentEntry = segmentEntryOpt.get + val baseOffset = segmentEntry.getKey val segment = segmentEntry.getValue val maxPosition = @@ -1638,9 +1640,9 @@ class Log(@volatile private var _dir: File, if (fetchDataInfo != null) { if (includeAbortedTxns) fetchDataInfo = addAbortedTransactions(startOffset, segmentEntry, fetchDataInfo) - } else segmentEntry = segments.higherEntry(segmentEntry.getKey) + } else segmentEntryOpt = segments.higherEntry(baseOffset) - done = fetchDataInfo != null || segmentEntry == null + done = fetchDataInfo != null || segmentEntryOpt.isEmpty } if (fetchDataInfo != null) fetchDataInfo @@ -1655,10 +1657,10 @@ class Log(@volatile private var _dir: File, } private[log] def collectAbortedTransactions(startOffset: Long, upperBoundOffset: Long): List[AbortedTxn] = { - val segmentEntry = segments.floorEntry(startOffset) + val segmentEntryOpt = segments.floorEntry(startOffset) val allAbortedTxns = ListBuffer.empty[AbortedTxn] def accumulator(abortedTxns: List[AbortedTxn]): Unit = allAbortedTxns ++= abortedTxns - collectAbortedTransactions(logStartOffset, upperBoundOffset, segmentEntry, accumulator) + collectAbortedTransactions(logStartOffset, upperBoundOffset, segmentEntryOpt.get, accumulator) allAbortedTxns.toList } @@ -1668,11 +1670,7 @@ class Log(@volatile private var _dir: File, val startOffsetPosition = OffsetPosition(fetchInfo.fetchOffsetMetadata.messageOffset, fetchInfo.fetchOffsetMetadata.relativePositionInSegment) val upperBoundOffset = segmentEntry.getValue.fetchUpperBoundOffset(startOffsetPosition, fetchSize).getOrElse { - val nextSegmentEntry = segments.higherEntry(segmentEntry.getKey) - if (nextSegmentEntry != null) - nextSegmentEntry.getValue.baseOffset - else - logEndOffset + segments.higherSegment(segmentEntry.getKey).map(_.baseOffset).getOrElse(logEndOffset) } val abortedTransactions = ListBuffer.empty[FetchResponseData.AbortedTransaction] @@ -1688,13 +1686,15 @@ class Log(@volatile private var _dir: File, private def collectAbortedTransactions(startOffset: Long, upperBoundOffset: Long, startingSegmentEntry: JEntry[JLong, LogSegment], accumulator: List[AbortedTxn] => Unit): Unit = { - var segmentEntry = startingSegmentEntry - while (segmentEntry != null) { - val searchResult = segmentEntry.getValue.collectAbortedTxns(startOffset, upperBoundOffset) + var segmentEntryOpt = Option(startingSegmentEntry) + while (segmentEntryOpt.isDefined) { + val baseOffset = segmentEntryOpt.get.getKey + val segment = segmentEntryOpt.get.getValue + val searchResult = segment.collectAbortedTxns(startOffset, upperBoundOffset) accumulator(searchResult.abortedTransactions) if (searchResult.isComplete) return - segmentEntry = segments.higherEntry(segmentEntry.getKey) + segmentEntryOpt = segments.higherEntry(baseOffset) } } @@ -1752,19 +1752,19 @@ class Log(@volatile private var _dir: File, def legacyFetchOffsetsBefore(timestamp: Long, maxNumOffsets: Int): Seq[Long] = { // Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides // constant time access while being safe to use with concurrent collections unlike `toArray`. - val segments = logSegments.toBuffer - val lastSegmentHasSize = segments.last.size > 0 + val allSegments = logSegments.toBuffer + val lastSegmentHasSize = allSegments.last.size > 0 val offsetTimeArray = if (lastSegmentHasSize) - new Array[(Long, Long)](segments.length + 1) + new Array[(Long, Long)](allSegments.length + 1) else - new Array[(Long, Long)](segments.length) + new Array[(Long, Long)](allSegments.length) - for (i <- segments.indices) - offsetTimeArray(i) = (math.max(segments(i).baseOffset, logStartOffset), segments(i).lastModified) + for (i <- allSegments.indices) + offsetTimeArray(i) = (math.max(allSegments(i).baseOffset, logStartOffset), allSegments(i).lastModified) if (lastSegmentHasSize) - offsetTimeArray(segments.length) = (logEndOffset, time.milliseconds) + offsetTimeArray(allSegments.length) = (logEndOffset, time.milliseconds) var startIndex = -1 timestamp match { @@ -1830,13 +1830,13 @@ class Log(@volatile private var _dir: File, val numToDelete = deletable.size if (numToDelete > 0) { // we must always have at least one segment, so if we are going to delete all the segments, create a new one first - if (segments.size == numToDelete) + if (numberOfSegments == numToDelete) roll() lock synchronized { checkIfMemoryMappedBufferClosed() // remove the segments for lookups removeAndDeleteSegments(deletable, asyncDelete = true, reason) - maybeIncrementLogStartOffset(segments.firstEntry.getValue.baseOffset, SegmentDeletion) + maybeIncrementLogStartOffset(segments.firstSegment.get.baseOffset, SegmentDeletion) } } numToDelete @@ -1860,20 +1860,23 @@ class Log(@volatile private var _dir: File, Seq.empty } else { val deletable = ArrayBuffer.empty[LogSegment] - var segmentEntry = segments.firstEntry - while (segmentEntry != null) { + var segmentEntryOpt = segments.firstEntry + while (segmentEntryOpt.isDefined) { + val segmentEntry = segmentEntryOpt.get val segment = segmentEntry.getValue - val nextSegmentEntry = segments.higherEntry(segmentEntry.getKey) - val (nextSegment, upperBoundOffset, isLastSegmentAndEmpty) = if (nextSegmentEntry != null) - (nextSegmentEntry.getValue, nextSegmentEntry.getValue.baseOffset, false) - else - (null, logEndOffset, segment.size == 0) + val nextSegmentEntryOpt = segments.higherEntry(segmentEntry.getKey) + val (nextSegment, upperBoundOffset, isLastSegmentAndEmpty) = + nextSegmentEntryOpt.map { + entry => (entry.getValue, entry.getValue.baseOffset, false) + }.getOrElse { + (null, logEndOffset, segment.size == 0) + } if (highWatermark >= upperBoundOffset && predicate(segment, Option(nextSegment)) && !isLastSegmentAndEmpty) { deletable += segment - segmentEntry = nextSegmentEntry + segmentEntryOpt = nextSegmentEntryOpt } else { - segmentEntry = null + segmentEntryOpt = Option.empty } } deletable @@ -2011,7 +2014,7 @@ class Log(@volatile private var _dir: File, val newOffset = math.max(expectedNextOffset.getOrElse(0L), logEndOffset) val logFile = Log.logFile(dir, newOffset) - if (segments.containsKey(newOffset)) { + if (segments.contains(newOffset)) { // segment with the same base offset already exists and loaded if (activeSegment.baseOffset == newOffset && activeSegment.size == 0) { // We have seen this happen (see KAFKA-6388) after shouldRoll() returns true for an @@ -2040,7 +2043,7 @@ class Log(@volatile private var _dir: File, Files.delete(file.toPath) } - Option(segments.lastEntry).foreach(_.getValue.onBecomeInactiveSegment()) + segments.lastSegment.foreach(_.onBecomeInactiveSegment()) } // take a snapshot of the producer state to facilitate recovery. It is useful to have the snapshot @@ -2106,9 +2109,6 @@ class Log(@volatile private var _dir: File, } } - private def lowerSegment(offset: Long): Option[LogSegment] = - Option(segments.lowerEntry(offset)).map(_.getValue) - /** * Completely delete this log directory and all contents from the file system with no delay */ @@ -2173,7 +2173,7 @@ class Log(@volatile private var _dir: File, info(s"Truncating to offset $targetOffset") lock synchronized { checkIfMemoryMappedBufferClosed() - if (segments.firstEntry.getValue.baseOffset > targetOffset) { + if (segments.firstSegment.get.baseOffset > targetOffset) { truncateFullyAndStartAt(targetOffset) } else { val deletable = logSegments.filter(segment => segment.baseOffset > targetOffset) @@ -2239,49 +2239,23 @@ class Log(@volatile private var _dir: File, /** * The active segment that is currently taking appends */ - def activeSegment = segments.lastEntry.getValue + def activeSegment = segments.activeSegment /** * All the log segments in this log ordered from oldest to newest */ - def logSegments: Iterable[LogSegment] = segments.values.asScala + def logSegments: Iterable[LogSegment] = segments.values /** * Get all segments beginning with the segment that includes "from" and ending with the segment * that includes up to "to-1" or the end of the log (if to > logEndOffset). */ - def logSegments(from: Long, to: Long): Iterable[LogSegment] = { - if (from == to) { - // Handle non-segment-aligned empty sets - List.empty[LogSegment] - } else if (to < from) { - throw new IllegalArgumentException(s"Invalid log segment range: requested segments in $topicPartition " + - s"from offset $from which is greater than limit offset $to") - } else { - lock synchronized { - val view = Option(segments.floorKey(from)).map { floor => - segments.subMap(floor, to) - }.getOrElse(segments.headMap(to)) - view.values.asScala - } - } + def logSegments(from: Long, to: Long): Iterable[LogSegment] = lock synchronized { + segments.values(from, to) } - def nonActiveLogSegmentsFrom(from: Long): Iterable[LogSegment] = { - lock synchronized { - if (from > activeSegment.baseOffset) - Seq.empty - else - logSegments(from, activeSegment.baseOffset) - } - } - - /** - * Get the largest log segment with a base offset less than or equal to the given offset, if one exists. - * @return the optional log segment - */ - private def floorLogSegment(offset: Long): Option[LogSegment] = { - Option(segments.floorEntry(offset)).map(_.getValue) + def nonActiveLogSegmentsFrom(from: Long): Iterable[LogSegment] = lock synchronized { + segments.nonActiveLogSegmentsFrom(from) } override def toString: String = { @@ -2399,7 +2373,7 @@ class Log(@volatile private var _dir: File, // Some old segments may have been removed from index and scheduled for async deletion after the caller reads segments // but before this method is executed. We want to filter out those segments to avoid calling asyncDeleteSegment() // multiple times for the same segment. - val sortedOldSegments = oldSegments.filter(seg => segments.containsKey(seg.baseOffset)).sortBy(_.baseOffset) + val sortedOldSegments = oldSegments.filter(seg => segments.contains(seg.baseOffset)).sortBy(_.baseOffset) checkIfMemoryMappedBufferClosed() // need to do this in two phases to be crash safe AND do the delete asynchronously @@ -2451,7 +2425,7 @@ class Log(@volatile private var _dir: File, * @param segment The segment to add */ @threadsafe - private[log] def addSegment(segment: LogSegment): LogSegment = this.segments.put(segment.baseOffset, segment) + private[log] def addSegment(segment: LogSegment): LogSegment = this.segments.add(segment) private def maybeHandleIOException[T](msg: => String)(fun: => T): T = { try { diff --git a/core/src/main/scala/kafka/log/LogSegments.scala b/core/src/main/scala/kafka/log/LogSegments.scala new file mode 100644 index 0000000000000..5a8fb9aa70d44 --- /dev/null +++ b/core/src/main/scala/kafka/log/LogSegments.scala @@ -0,0 +1,225 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log + +import java.io.File +import java.lang.{Long => JLong} +import java.util.Map +import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap} + +import kafka.utils.threadsafe +import org.apache.kafka.common.TopicPartition + +import scala.jdk.CollectionConverters._ + +/** + * This class encapsulates a thread-safe navigable map of LogSegment instances and provides the + * required read and write behavior on the map. + * + * @param topicPartition the TopicPartition associated with the segments + * (useful for logging purposes) + */ +class LogSegments(private val topicPartition: TopicPartition) { + + /* the segments of the log with key being LogSegment base offset and value being a LogSegment */ + private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment] + + /** + * @return true if the segments are empty, false otherwise. + */ + @threadsafe + def isEmpty: Boolean = segments.isEmpty + + /** + * @return true if the segments are non-empty, false otherwise. + */ + @threadsafe + def nonEmpty: Boolean = !isEmpty + + /** + * Add the given segment, or replace an existing entry. + * + * @param segment the segment to add + */ + @threadsafe + def add(segment: LogSegment): LogSegment = this.segments.put(segment.baseOffset, segment) + + /** + * Remove the segment at the provided offset. + * + * @param offset the offset to be removed + */ + @threadsafe + def remove(offset: Long): Unit = segments.remove(offset) + + /** + * Clears all entries. + */ + @threadsafe + def clear(): Unit = segments.clear() + + /** + * Close all segments. + */ + def close(): Unit = values.foreach(_.close()) + + /** + * Close the handlers for all segments. + */ + def closeHandlers(): Unit = values.foreach(_.closeHandlers()) + + /** + * Update the directory reference for the log and indices of all segments. + * + * @param dir the renamed directory + */ + def updateParentDir(dir: File): Unit = values.foreach(_.updateParentDir(dir)) + + /** + * Take care! this is an O(n) operation, where n is the number of segments. + * + * @return The number of segments. + * + */ + @threadsafe + def numberOfSegments: Int = segments.size + + /** + * The active segment that is currently taking appends. + */ + @threadsafe + def activeSegment = lastEntry.get.getValue + + /** + * @return the base offsets of all segments + */ + def baseOffsets: Seq[Long] = segments.values().asScala.map(_.baseOffset).toSeq + + /** + * @param offset the segment to be checked + * @return true if a segment exists at the provided offset, false otherwise. + */ + @threadsafe + def contains(offset: Long): Boolean = segments.containsKey(offset) + + /** + * Retrieves a segment at the specified offset. + * + * @param offset the segment to be retrieved + * + * @return the segment if it exists, otherwise None. + */ + @threadsafe + def get(offset: Long): Option[LogSegment] = Option(segments.get(offset)) + + /** + * @return an iterator to the log segments ordered from oldest to newest. + */ + def values: Iterable[LogSegment] = segments.values.asScala + + /** + * @return An iterator to all segments beginning with the segment that includes "from" and ending + * with the segment that includes up to "to-1" or the end of the log (if to > end of log). + */ + def values(from: Long, to: Long): Iterable[LogSegment] = { + if (from == to) { + // Handle non-segment-aligned empty sets + List.empty[LogSegment] + } else if (to < from) { + throw new IllegalArgumentException(s"Invalid log segment range: requested segments in $topicPartition " + + s"from offset $from which is greater than limit offset $to") + } else { + val view = Option(segments.floorKey(from)).map { floor => + segments.subMap(floor, to) + }.getOrElse(segments.headMap(to)) + view.values.asScala + } + } + + def nonActiveLogSegmentsFrom(from: Long): Iterable[LogSegment] = { + if (from > activeSegment.baseOffset) + Seq.empty + else + values(from, activeSegment.baseOffset) + } + + /** + * @return the entry associated with the greatest offset less than or equal to the given offset, + * if it exists. + */ + @threadsafe + def floorEntry(offset: Long): Option[Map.Entry[JLong, LogSegment]] = Option(segments.floorEntry(offset)) + + /** + * @return the log segment with the greatest offset less than or equal to the given offset, + * if it exists. + */ + @threadsafe + def floorSegment(offset: Long): Option[LogSegment] = floorEntry(offset).map(_.getValue) + + /** + * @return the entry associated with the greatest offset strictly less than the given offset, + * if it exists. + */ + @threadsafe + def lowerEntry(offset: Long): Option[Map.Entry[JLong, LogSegment]] = Option(segments.lowerEntry(offset)) + + /** + * @return the log segment with the greatest offset strictly less than the given offset, + * if it exists. + */ + @threadsafe + def lowerSegment(offset: Long): Option[LogSegment] = lowerEntry(offset).map(_.getValue) + + /** + * @return the entry associated with the smallest offset strictly greater than the given offset, + * if it exists. + */ + @threadsafe + def higherEntry(offset: Long): Option[Map.Entry[JLong, LogSegment]] = Option(segments.higherEntry(offset)) + + /** + * @return the log segment with the smallest offset strictly greater than the given offset, + * if it exists. + */ + @threadsafe + def higherSegment(offset: Long): Option[LogSegment] = higherEntry(offset).map(_.getValue) + + /** + * @return the entry associated with the smallest offset, if it exists. + */ + @threadsafe + def firstEntry: Option[Map.Entry[JLong, LogSegment]] = Option(segments.firstEntry) + + /** + * @return the log segment associated with the smallest offset, if it exists. + */ + @threadsafe + def firstSegment: Option[LogSegment] = firstEntry.map(_.getValue) + + /** + * @return the entry associated with the greatest offset, if it exists. + */ + @threadsafe + def lastEntry: Option[Map.Entry[JLong, LogSegment]] = Option(segments.lastEntry) + + /** + * @return the log segment with the greatest offset, if it exists. + */ + @threadsafe + def lastSegment: Option[LogSegment] = lastEntry.map(_.getValue) +} diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentsTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentsTest.scala new file mode 100644 index 0000000000000..538c45cfc7175 --- /dev/null +++ b/core/src/test/scala/unit/kafka/log/LogSegmentsTest.scala @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log + +import java.io.File + +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.utils.Time +import org.junit.jupiter.api.Assertions._ +import org.junit.jupiter.api.Test + +class LogSegmentsTest { + + val topicPartition = new TopicPartition("topic", 0) + var logDir: File = _ + + /* create a segment with the given base offset */ + private def createSegment(offset: Long, + indexIntervalBytes: Int = 10, + time: Time = Time.SYSTEM): LogSegment = { + LogUtils.createSegment(offset, logDir, indexIntervalBytes, time) + } + + private def assertEntry(segment: LogSegment, tested: java.util.Map.Entry[java.lang.Long, LogSegment]): Unit = { + assertEquals(segment.baseOffset, tested.getKey()) + assertEquals(segment, tested.getValue()) + } + + @Test + def testBasicOperations(): Unit = { + val segments = new LogSegments(topicPartition) + assertTrue(segments.isEmpty) + assertFalse(segments.nonEmpty) + + val offset1 = 40 + val seg1 = createSegment(offset1) + val offset2 = 80 + val seg2 = createSegment(offset2) + val seg3 = createSegment(offset1) + + // Add seg1 + segments.add(seg1) + assertFalse(segments.isEmpty) + assertTrue(segments.nonEmpty) + assertEquals(1, segments.numberOfSegments) + assertTrue(segments.contains(offset1)) + assertEquals(Some(seg1), segments.get(offset1)) + + // Add seg2 + segments.add(seg2) + assertFalse(segments.isEmpty) + assertTrue(segments.nonEmpty) + assertEquals(2, segments.numberOfSegments) + assertTrue(segments.contains(offset2)) + assertEquals(Some(seg2), segments.get(offset2)) + + // Replace seg1 with seg3 + segments.add(seg3) + assertFalse(segments.isEmpty) + assertTrue(segments.nonEmpty) + assertEquals(2, segments.numberOfSegments) + assertTrue(segments.contains(offset1)) + assertEquals(Some(seg3), segments.get(offset1)) + + // Remove seg2 + segments.remove(offset2) + assertFalse(segments.isEmpty) + assertTrue(segments.nonEmpty) + assertEquals(1, segments.numberOfSegments) + assertFalse(segments.contains(offset2)) + + // Clear all segments including seg3 + segments.clear() + assertTrue(segments.isEmpty) + assertFalse(segments.nonEmpty) + assertEquals(0, segments.numberOfSegments) + assertFalse(segments.contains(offset1)) + } + + @Test + def testSegmentAccess(): Unit = { + val segments = new LogSegments(topicPartition) + val offset1 = 1 + val seg1 = createSegment(offset1) + val offset2 = 2 + val seg2 = createSegment(offset2) + val offset3 = 3 + val seg3 = createSegment(offset3) + val offset4 = 4 + val seg4 = createSegment(offset4) + + // Test firstEntry, lastEntry + List(seg1, seg2, seg3, seg4).foreach { + seg => + segments.add(seg) + assertEquals(seg, segments.activeSegment) + assertEntry(seg1, segments.firstEntry.get) + assertEquals(Some(seg1), segments.firstSegment) + assertEntry(seg, segments.lastEntry.get) + assertEquals(Some(seg), segments.lastSegment) + } + + // Test baseOffsets + assertEquals(Seq(offset1, offset2, offset3, offset4), segments.baseOffsets) + + // Test values + assertEquals(Seq(seg1, seg2, seg3, seg4), segments.values.toSeq) + + // Test values(to, from) + assertThrows(classOf[IllegalArgumentException], () => segments.values(2, 1)) + assertEquals(Seq(), segments.values(1, 1).toSeq) + assertEquals(Seq(seg1), segments.values(1, 2).toSeq) + assertEquals(Seq(seg1, seg2), segments.values(1, 3).toSeq) + assertEquals(Seq(seg1, seg2, seg3), segments.values(1, 4).toSeq) + assertEquals(Seq(seg2, seg3), segments.values(2, 4).toSeq) + assertEquals(Seq(seg3), segments.values(3, 4).toSeq) + assertEquals(Seq(), segments.values(4, 4).toSeq) + assertEquals(Seq(seg4), segments.values(4, 5).toSeq) + + } + + @Test + def testClosestMatchOperations(): Unit = { + val segments = new LogSegments(topicPartition) + val seg1 = createSegment(1) + val seg2 = createSegment(3) + val seg3 = createSegment(5) + val seg4 = createSegment(7) + + List(seg1, seg2, seg3, seg4).foreach(segments.add) + + // Test floorSegment, floorEntry + assertEquals(Some(seg1), segments.floorSegment(2)) + assertEntry(seg1, segments.floorEntry(2).get) + assertEquals(Some(seg2), segments.floorSegment(3)) + assertEntry(seg2, segments.floorEntry(3).get) + + // Test lowerSegment, lowerEntry + assertEquals(Some(seg1), segments.lowerSegment(3)) + assertEntry(seg1, segments.lowerEntry(3).get) + assertEquals(Some(seg2), segments.lowerSegment(4)) + assertEntry(seg2, segments.lowerEntry(4).get) + + // Test higherSegment, higherEntry + assertEquals(Some(seg3), segments.higherSegment(4)) + assertEntry(seg3, segments.higherEntry(4).get) + assertEquals(Some(seg4), segments.higherSegment(5)) + assertEntry(seg4, segments.higherEntry(5).get) + } +} From 52f23db2218c8aa2728167af769d9a41bf9c362c Mon Sep 17 00:00:00 2001 From: Kowshik Prakasam Date: Mon, 29 Mar 2021 11:31:15 -0700 Subject: [PATCH 2/4] Address comments from Jun --- core/src/main/scala/kafka/log/Log.scala | 2 +- core/src/main/scala/kafka/log/LogSegments.scala | 7 +------ core/src/test/scala/unit/kafka/log/LogSegmentsTest.scala | 1 - 3 files changed, 2 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 8f34032f8794f..091f1e5978a8f 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -2239,7 +2239,7 @@ class Log(@volatile private var _dir: File, /** * The active segment that is currently taking appends */ - def activeSegment = segments.activeSegment + def activeSegment = segments.lastSegment.get /** * All the log segments in this log ordered from oldest to newest diff --git a/core/src/main/scala/kafka/log/LogSegments.scala b/core/src/main/scala/kafka/log/LogSegments.scala index 5a8fb9aa70d44..fd1cb062a8d13 100644 --- a/core/src/main/scala/kafka/log/LogSegments.scala +++ b/core/src/main/scala/kafka/log/LogSegments.scala @@ -98,12 +98,6 @@ class LogSegments(private val topicPartition: TopicPartition) { @threadsafe def numberOfSegments: Int = segments.size - /** - * The active segment that is currently taking appends. - */ - @threadsafe - def activeSegment = lastEntry.get.getValue - /** * @return the base offsets of all segments */ @@ -151,6 +145,7 @@ class LogSegments(private val topicPartition: TopicPartition) { } def nonActiveLogSegmentsFrom(from: Long): Iterable[LogSegment] = { + val activeSegment = lastSegment.get if (from > activeSegment.baseOffset) Seq.empty else diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentsTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentsTest.scala index 538c45cfc7175..17888aee67003 100644 --- a/core/src/test/scala/unit/kafka/log/LogSegmentsTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogSegmentsTest.scala @@ -107,7 +107,6 @@ class LogSegmentsTest { List(seg1, seg2, seg3, seg4).foreach { seg => segments.add(seg) - assertEquals(seg, segments.activeSegment) assertEntry(seg1, segments.firstEntry.get) assertEquals(Some(seg1), segments.firstSegment) assertEntry(seg, segments.lastEntry.get) From 2bb3c19446f6d0d4d1ec8a8456c3d9885ab8c987 Mon Sep 17 00:00:00 2001 From: Kowshik Prakasam Date: Mon, 29 Mar 2021 11:34:18 -0700 Subject: [PATCH 3/4] Minor improvement --- core/src/main/scala/kafka/log/LogSegments.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/log/LogSegments.scala b/core/src/main/scala/kafka/log/LogSegments.scala index fd1cb062a8d13..f0f20c9ddb392 100644 --- a/core/src/main/scala/kafka/log/LogSegments.scala +++ b/core/src/main/scala/kafka/log/LogSegments.scala @@ -33,7 +33,7 @@ import scala.jdk.CollectionConverters._ * @param topicPartition the TopicPartition associated with the segments * (useful for logging purposes) */ -class LogSegments(private val topicPartition: TopicPartition) { +class LogSegments(topicPartition: TopicPartition) { /* the segments of the log with key being LogSegment base offset and value being a LogSegment */ private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment] From 9dbb679736133877226a2ee705f964cb5ae75efc Mon Sep 17 00:00:00 2001 From: Kowshik Prakasam Date: Mon, 29 Mar 2021 14:19:17 -0700 Subject: [PATCH 4/4] Address comments from Dhruvil --- core/src/main/scala/kafka/log/Log.scala | 2 +- core/src/main/scala/kafka/log/LogSegments.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 091f1e5978a8f..43f858d84afa8 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -344,7 +344,7 @@ class Log(@volatile private var _dir: File, // Reload all snapshots into the ProducerStateManager cache, the intermediate ProducerStateManager used // during log recovery may have deleted some files without the Log.producerStateManager instance witnessing the // deletion. - producerStateManager.removeStraySnapshots(segments.baseOffsets) + producerStateManager.removeStraySnapshots(segments.baseOffsets.toSeq) loadProducerState(logEndOffset, reloadFromCleanShutdown = hadCleanShutdown) // Delete partition metadata file if the version does not support topic IDs. diff --git a/core/src/main/scala/kafka/log/LogSegments.scala b/core/src/main/scala/kafka/log/LogSegments.scala index f0f20c9ddb392..d9e564ed4b13d 100644 --- a/core/src/main/scala/kafka/log/LogSegments.scala +++ b/core/src/main/scala/kafka/log/LogSegments.scala @@ -101,7 +101,7 @@ class LogSegments(topicPartition: TopicPartition) { /** * @return the base offsets of all segments */ - def baseOffsets: Seq[Long] = segments.values().asScala.map(_.baseOffset).toSeq + def baseOffsets: Iterable[Long] = segments.values().asScala.map(_.baseOffset) /** * @param offset the segment to be checked