Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 63 additions & 89 deletions core/src/main/scala/kafka/log/Log.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import java.text.NumberFormat
import java.util.Map.{Entry => JEntry}
import java.util.Optional
import java.util.concurrent.atomic._
import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap, TimeUnit}
import java.util.concurrent.TimeUnit
import java.util.regex.Pattern

import kafka.api.{ApiVersion, KAFKA_0_10_0_IV0}
Expand Down Expand Up @@ -308,7 +308,7 @@ class Log(@volatile private var _dir: File,
@volatile private var highWatermarkMetadata: LogOffsetMetadata = LogOffsetMetadata(logStartOffset)

/* the actual segments of the log */
private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]
private val segments: LogSegments = new LogSegments(topicPartition)

// Visible for testing
@volatile var leaderEpochCache: Option[LeaderEpochFileCache] = None
Expand All @@ -331,7 +331,7 @@ class Log(@volatile private var _dir: File,

leaderEpochCache.foreach(_.truncateFromEnd(nextOffsetMetadata.messageOffset))

updateLogStartOffset(math.max(logStartOffset, segments.firstEntry.getValue.baseOffset))
updateLogStartOffset(math.max(logStartOffset, segments.firstSegment.get.baseOffset))

// The earliest leader epoch may not be flushed during a hard failure. Recover it here.
leaderEpochCache.foreach(_.truncateFromStart(logStartOffset))
Expand All @@ -344,7 +344,7 @@ class Log(@volatile private var _dir: File,
// Reload all snapshots into the ProducerStateManager cache, the intermediate ProducerStateManager used
// during log recovery may have deleted some files without the Log.producerStateManager instance witnessing the
// deletion.
producerStateManager.removeStraySnapshots(segments.values().asScala.map(_.baseOffset).toSeq)
producerStateManager.removeStraySnapshots(segments.baseOffsets.toSeq)
loadProducerState(logEndOffset, reloadFromCleanShutdown = hadCleanShutdown)

// Delete partition metadata file if the version does not support topic IDs.
Expand Down Expand Up @@ -785,7 +785,7 @@ class Log(@volatile private var _dir: File,
// In case we encounter a segment with offset overflow, the retry logic will split it after which we need to retry
// loading of segments. In that case, we also need to close all segments that could have been left open in previous
// call to loadSegmentFiles().
logSegments.foreach(_.close())
segments.close()
segments.clear()
loadSegmentFiles()
}
Expand Down Expand Up @@ -850,7 +850,7 @@ class Log(@volatile private var _dir: File,
private[log] def recoverLog(): Long = {
/** return the log end offset if valid */
def deleteSegmentsIfLogStartGreaterThanLogEnd(): Option[Long] = {
if (logSegments.nonEmpty) {
if (segments.nonEmpty) {
val logEndOffset = activeSegment.readNextOffset
if (logEndOffset >= logStartOffset)
Some(logEndOffset)
Expand Down Expand Up @@ -929,11 +929,11 @@ class Log(@volatile private var _dir: File,
reloadFromCleanShutdown: Boolean,
producerStateManager: ProducerStateManager): Unit = lock synchronized {
checkIfMemoryMappedBufferClosed()
val segments = logSegments
val allSegments = logSegments
val offsetsToSnapshot =
if (segments.nonEmpty) {
val nextLatestSegmentBaseOffset = lowerSegment(segments.last.baseOffset).map(_.baseOffset)
Seq(nextLatestSegmentBaseOffset, Some(segments.last.baseOffset), Some(lastOffset))
if (allSegments.nonEmpty) {
val nextLatestSegmentBaseOffset = segments.lowerSegment(allSegments.last.baseOffset).map(_.baseOffset)
Seq(nextLatestSegmentBaseOffset, Some(allSegments.last.baseOffset), Some(lastOffset))
} else {
Seq(Some(lastOffset))
}
Expand Down Expand Up @@ -974,7 +974,7 @@ class Log(@volatile private var _dir: File,
// and we can skip the loading. This is an optimization for users which are not yet using
// idempotent/transactional features yet.
if (lastOffset > producerStateManager.mapEndOffset && !isEmptyBeforeTruncation) {
val segmentOfLastOffset = floorLogSegment(lastOffset)
val segmentOfLastOffset = segments.floorSegment(lastOffset)

logSegments(producerStateManager.mapEndOffset, lastOffset).foreach { segment =>
val startOffset = Utils.max(segment.baseOffset, producerStateManager.mapEndOffset, logStartOffset)
Expand Down Expand Up @@ -1043,7 +1043,7 @@ class Log(@volatile private var _dir: File,
* The number of segments in the log.
* Take care! this is an O(n) operation.
*/
def numberOfSegments: Int = segments.size
def numberOfSegments: Int = segments.numberOfSegments

/**
* Close this log.
Expand All @@ -1059,7 +1059,7 @@ class Log(@volatile private var _dir: File,
// after restarting and to ensure that we cannot inadvertently hit the upgrade optimization
// (the clean shutdown file is written after the logs are all closed).
producerStateManager.takeSnapshot()
logSegments.foreach(_.close())
segments.close()
}
}
}
Expand All @@ -1077,7 +1077,7 @@ class Log(@volatile private var _dir: File,
if (renamedDir != dir) {
_dir = renamedDir
_parentDir = renamedDir.getParent
logSegments.foreach(_.updateParentDir(renamedDir))
segments.updateParentDir(renamedDir)
producerStateManager.updateParentDir(dir)
// re-initialize leader epoch cache so that LeaderEpochCheckpointFile.checkpoint can correctly reference
// the checkpoint file in renamed log directory
Expand All @@ -1094,7 +1094,7 @@ class Log(@volatile private var _dir: File,
def closeHandlers(): Unit = {
debug("Closing handlers")
lock synchronized {
logSegments.foreach(_.closeHandlers())
segments.closeHandlers()
isMemoryMappedBufferClosed = true
}
}
Expand Down Expand Up @@ -1603,10 +1603,10 @@ class Log(@volatile private var _dir: File,
// We create the local variables to avoid race conditions with updates to the log.
val endOffsetMetadata = nextOffsetMetadata
val endOffset = endOffsetMetadata.messageOffset
var segmentEntry = segments.floorEntry(startOffset)
var segmentEntryOpt = segments.floorEntry(startOffset)

// return error on attempt to read beyond the log end offset or read below log start offset
if (startOffset > endOffset || segmentEntry == null || startOffset < logStartOffset)
if (startOffset > endOffset || segmentEntryOpt.isEmpty || startOffset < logStartOffset)
throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, " +
s"but we only have log segments in the range $logStartOffset to $endOffset.")

Expand All @@ -1624,9 +1624,11 @@ class Log(@volatile private var _dir: File,
// Do the read on the segment with a base offset less than the target offset
// but if that segment doesn't contain any messages with an offset greater than that
// continue to read from successive segments until we get some messages or we reach the end of the log
var done = segmentEntry == null
var done = segmentEntryOpt.isEmpty
var fetchDataInfo: FetchDataInfo = null
while (!done) {
val segmentEntry = segmentEntryOpt.get
val baseOffset = segmentEntry.getKey
val segment = segmentEntry.getValue

val maxPosition =
Expand All @@ -1638,9 +1640,9 @@ class Log(@volatile private var _dir: File,
if (fetchDataInfo != null) {
if (includeAbortedTxns)
fetchDataInfo = addAbortedTransactions(startOffset, segmentEntry, fetchDataInfo)
} else segmentEntry = segments.higherEntry(segmentEntry.getKey)
} else segmentEntryOpt = segments.higherEntry(baseOffset)

done = fetchDataInfo != null || segmentEntry == null
done = fetchDataInfo != null || segmentEntryOpt.isEmpty
}

if (fetchDataInfo != null) fetchDataInfo
Expand All @@ -1655,10 +1657,10 @@ class Log(@volatile private var _dir: File,
}

private[log] def collectAbortedTransactions(startOffset: Long, upperBoundOffset: Long): List[AbortedTxn] = {
val segmentEntry = segments.floorEntry(startOffset)
val segmentEntryOpt = segments.floorEntry(startOffset)
val allAbortedTxns = ListBuffer.empty[AbortedTxn]
def accumulator(abortedTxns: List[AbortedTxn]): Unit = allAbortedTxns ++= abortedTxns
collectAbortedTransactions(logStartOffset, upperBoundOffset, segmentEntry, accumulator)
collectAbortedTransactions(logStartOffset, upperBoundOffset, segmentEntryOpt.get, accumulator)
allAbortedTxns.toList
}

Expand All @@ -1668,11 +1670,7 @@ class Log(@volatile private var _dir: File,
val startOffsetPosition = OffsetPosition(fetchInfo.fetchOffsetMetadata.messageOffset,
fetchInfo.fetchOffsetMetadata.relativePositionInSegment)
val upperBoundOffset = segmentEntry.getValue.fetchUpperBoundOffset(startOffsetPosition, fetchSize).getOrElse {
val nextSegmentEntry = segments.higherEntry(segmentEntry.getKey)
if (nextSegmentEntry != null)
nextSegmentEntry.getValue.baseOffset
else
logEndOffset
segments.higherSegment(segmentEntry.getKey).map(_.baseOffset).getOrElse(logEndOffset)
}

val abortedTransactions = ListBuffer.empty[FetchResponseData.AbortedTransaction]
Expand All @@ -1688,13 +1686,15 @@ class Log(@volatile private var _dir: File,
private def collectAbortedTransactions(startOffset: Long, upperBoundOffset: Long,
startingSegmentEntry: JEntry[JLong, LogSegment],
accumulator: List[AbortedTxn] => Unit): Unit = {
var segmentEntry = startingSegmentEntry
while (segmentEntry != null) {
val searchResult = segmentEntry.getValue.collectAbortedTxns(startOffset, upperBoundOffset)
var segmentEntryOpt = Option(startingSegmentEntry)
while (segmentEntryOpt.isDefined) {
val baseOffset = segmentEntryOpt.get.getKey
val segment = segmentEntryOpt.get.getValue
val searchResult = segment.collectAbortedTxns(startOffset, upperBoundOffset)
accumulator(searchResult.abortedTransactions)
if (searchResult.isComplete)
return
segmentEntry = segments.higherEntry(segmentEntry.getKey)
segmentEntryOpt = segments.higherEntry(baseOffset)
}
}

Expand Down Expand Up @@ -1752,19 +1752,19 @@ class Log(@volatile private var _dir: File,
def legacyFetchOffsetsBefore(timestamp: Long, maxNumOffsets: Int): Seq[Long] = {
// Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides
// constant time access while being safe to use with concurrent collections unlike `toArray`.
val segments = logSegments.toBuffer
val lastSegmentHasSize = segments.last.size > 0
val allSegments = logSegments.toBuffer
val lastSegmentHasSize = allSegments.last.size > 0

val offsetTimeArray =
if (lastSegmentHasSize)
new Array[(Long, Long)](segments.length + 1)
new Array[(Long, Long)](allSegments.length + 1)
else
new Array[(Long, Long)](segments.length)
new Array[(Long, Long)](allSegments.length)

for (i <- segments.indices)
offsetTimeArray(i) = (math.max(segments(i).baseOffset, logStartOffset), segments(i).lastModified)
for (i <- allSegments.indices)
offsetTimeArray(i) = (math.max(allSegments(i).baseOffset, logStartOffset), allSegments(i).lastModified)
if (lastSegmentHasSize)
offsetTimeArray(segments.length) = (logEndOffset, time.milliseconds)
offsetTimeArray(allSegments.length) = (logEndOffset, time.milliseconds)

var startIndex = -1
timestamp match {
Expand Down Expand Up @@ -1830,13 +1830,13 @@ class Log(@volatile private var _dir: File,
val numToDelete = deletable.size
if (numToDelete > 0) {
// we must always have at least one segment, so if we are going to delete all the segments, create a new one first
if (segments.size == numToDelete)
if (numberOfSegments == numToDelete)
roll()
lock synchronized {
checkIfMemoryMappedBufferClosed()
// remove the segments for lookups
removeAndDeleteSegments(deletable, asyncDelete = true, reason)
maybeIncrementLogStartOffset(segments.firstEntry.getValue.baseOffset, SegmentDeletion)
maybeIncrementLogStartOffset(segments.firstSegment.get.baseOffset, SegmentDeletion)
}
}
numToDelete
Expand All @@ -1860,20 +1860,23 @@ class Log(@volatile private var _dir: File,
Seq.empty
} else {
val deletable = ArrayBuffer.empty[LogSegment]
var segmentEntry = segments.firstEntry
while (segmentEntry != null) {
var segmentEntryOpt = segments.firstEntry
while (segmentEntryOpt.isDefined) {
val segmentEntry = segmentEntryOpt.get
val segment = segmentEntry.getValue
val nextSegmentEntry = segments.higherEntry(segmentEntry.getKey)
val (nextSegment, upperBoundOffset, isLastSegmentAndEmpty) = if (nextSegmentEntry != null)
(nextSegmentEntry.getValue, nextSegmentEntry.getValue.baseOffset, false)
else
(null, logEndOffset, segment.size == 0)
val nextSegmentEntryOpt = segments.higherEntry(segmentEntry.getKey)
val (nextSegment, upperBoundOffset, isLastSegmentAndEmpty) =
nextSegmentEntryOpt.map {
entry => (entry.getValue, entry.getValue.baseOffset, false)
}.getOrElse {
(null, logEndOffset, segment.size == 0)
}

if (highWatermark >= upperBoundOffset && predicate(segment, Option(nextSegment)) && !isLastSegmentAndEmpty) {
deletable += segment
segmentEntry = nextSegmentEntry
segmentEntryOpt = nextSegmentEntryOpt
} else {
segmentEntry = null
segmentEntryOpt = Option.empty
}
}
deletable
Expand Down Expand Up @@ -2011,7 +2014,7 @@ class Log(@volatile private var _dir: File,
val newOffset = math.max(expectedNextOffset.getOrElse(0L), logEndOffset)
val logFile = Log.logFile(dir, newOffset)

if (segments.containsKey(newOffset)) {
if (segments.contains(newOffset)) {
// segment with the same base offset already exists and loaded
if (activeSegment.baseOffset == newOffset && activeSegment.size == 0) {
// We have seen this happen (see KAFKA-6388) after shouldRoll() returns true for an
Expand Down Expand Up @@ -2040,7 +2043,7 @@ class Log(@volatile private var _dir: File,
Files.delete(file.toPath)
}

Option(segments.lastEntry).foreach(_.getValue.onBecomeInactiveSegment())
segments.lastSegment.foreach(_.onBecomeInactiveSegment())
}

// take a snapshot of the producer state to facilitate recovery. It is useful to have the snapshot
Expand Down Expand Up @@ -2106,9 +2109,6 @@ class Log(@volatile private var _dir: File,
}
}

private def lowerSegment(offset: Long): Option[LogSegment] =
Option(segments.lowerEntry(offset)).map(_.getValue)

/**
* Completely delete this log directory and all contents from the file system with no delay
*/
Expand Down Expand Up @@ -2173,7 +2173,7 @@ class Log(@volatile private var _dir: File,
info(s"Truncating to offset $targetOffset")
lock synchronized {
checkIfMemoryMappedBufferClosed()
if (segments.firstEntry.getValue.baseOffset > targetOffset) {
if (segments.firstSegment.get.baseOffset > targetOffset) {
truncateFullyAndStartAt(targetOffset)
} else {
val deletable = logSegments.filter(segment => segment.baseOffset > targetOffset)
Expand Down Expand Up @@ -2239,49 +2239,23 @@ class Log(@volatile private var _dir: File,
/**
* The active segment that is currently taking appends
*/
def activeSegment = segments.lastEntry.getValue
def activeSegment = segments.lastSegment.get

/**
* All the log segments in this log ordered from oldest to newest
*/
def logSegments: Iterable[LogSegment] = segments.values.asScala
def logSegments: Iterable[LogSegment] = segments.values

/**
* Get all segments beginning with the segment that includes "from" and ending with the segment
* that includes up to "to-1" or the end of the log (if to > logEndOffset).
*/
def logSegments(from: Long, to: Long): Iterable[LogSegment] = {
if (from == to) {
// Handle non-segment-aligned empty sets
List.empty[LogSegment]
} else if (to < from) {
throw new IllegalArgumentException(s"Invalid log segment range: requested segments in $topicPartition " +
s"from offset $from which is greater than limit offset $to")
} else {
lock synchronized {
val view = Option(segments.floorKey(from)).map { floor =>
segments.subMap(floor, to)
}.getOrElse(segments.headMap(to))
view.values.asScala
}
}
def logSegments(from: Long, to: Long): Iterable[LogSegment] = lock synchronized {
segments.values(from, to)
}

def nonActiveLogSegmentsFrom(from: Long): Iterable[LogSegment] = {
lock synchronized {
if (from > activeSegment.baseOffset)
Seq.empty
else
logSegments(from, activeSegment.baseOffset)
}
}

/**
* Get the largest log segment with a base offset less than or equal to the given offset, if one exists.
* @return the optional log segment
*/
private def floorLogSegment(offset: Long): Option[LogSegment] = {
Option(segments.floorEntry(offset)).map(_.getValue)
def nonActiveLogSegmentsFrom(from: Long): Iterable[LogSegment] = lock synchronized {
segments.nonActiveLogSegmentsFrom(from)
}

override def toString: String = {
Expand Down Expand Up @@ -2399,7 +2373,7 @@ class Log(@volatile private var _dir: File,
// Some old segments may have been removed from index and scheduled for async deletion after the caller reads segments
// but before this method is executed. We want to filter out those segments to avoid calling asyncDeleteSegment()
// multiple times for the same segment.
val sortedOldSegments = oldSegments.filter(seg => segments.containsKey(seg.baseOffset)).sortBy(_.baseOffset)
val sortedOldSegments = oldSegments.filter(seg => segments.contains(seg.baseOffset)).sortBy(_.baseOffset)

checkIfMemoryMappedBufferClosed()
// need to do this in two phases to be crash safe AND do the delete asynchronously
Expand Down Expand Up @@ -2451,7 +2425,7 @@ class Log(@volatile private var _dir: File,
* @param segment The segment to add
*/
@threadsafe
private[log] def addSegment(segment: LogSegment): LogSegment = this.segments.put(segment.baseOffset, segment)
private[log] def addSegment(segment: LogSegment): LogSegment = this.segments.add(segment)

private def maybeHandleIOException[T](msg: => String)(fun: => T): T = {
try {
Expand Down
Loading