Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,010 changes: 1,010 additions & 0 deletions core/src/main/scala/kafka/log/LocalLog.scala

Large diffs are not rendered by default.

1,212 changes: 282 additions & 930 deletions core/src/main/scala/kafka/log/Log.scala

Large diffs are not rendered by default.

8 changes: 1 addition & 7 deletions core/src/main/scala/kafka/log/LogCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -449,12 +449,6 @@ object LogCleaner {
enableCleaner = config.logCleanerEnable)

}

def createNewCleanedSegment(dir: File, logConfig: LogConfig, baseOffset: Long): LogSegment = {
LogSegment.deleteIfExists(dir, baseOffset, fileSuffix = Log.CleanedFileSuffix)
LogSegment.open(dir, baseOffset, logConfig, Time.SYSTEM,
fileSuffix = Log.CleanedFileSuffix, initFileSize = logConfig.initFileSize, preallocate = logConfig.preallocate)
}
}

/**
Expand Down Expand Up @@ -562,7 +556,7 @@ private[log] class Cleaner(val id: Int,
stats: CleanerStats,
transactionMetadata: CleanedTransactionMetadata): Unit = {
// create a new segment with a suffix appended to the name of the log and indexes
val cleaned = LogCleaner.createNewCleanedSegment(log.dir, log.config, segments.head.baseOffset)
val cleaned = Log.createNewCleanedSegment(log.dir, log.config, segments.head.baseOffset)
transactionMetadata.cleanedIndex = Some(cleaned.txnIndex)

try {
Expand Down
34 changes: 26 additions & 8 deletions core/src/main/scala/kafka/log/LogLoader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,18 @@ case class LoadLogParams(dir: File,
* This object is responsible for all activities related with recovery of log segments from disk.
*/
object LogLoader extends Logging {

/**
* Clean shutdown file that indicates the broker was cleanly shutdown in 0.8 and higher.
* This is used to avoid unnecessary recovery after a clean shutdown. In theory this could be
* avoided by passing in the recovery point, however finding the correct position to do this
* requires accessing the offset index which may not be safe in an unclean shutdown.
* For more information see the discussion in PR#2104
*/
val CleanShutdownFile = ".kafka_cleanshutdown"

/**
* Load the log segments from the log files on disk, and return the components of the loaded log.
* Load the log segments from the log files on disk, and returns the components of the loaded log.
* Additionally, it also suitably updates the provided LeaderEpochFileCache and ProducerStateManager
* to reflect the contents of the loaded log.
*
Expand All @@ -90,7 +100,6 @@ object LogLoader extends Logging {
* overflow index offset
*/
def load(params: LoadLogParams): LoadedLogOffsets = {

// First pass: through the files in the log directory and remove any temporary files
// and find any interrupted swap operations
val swapFiles = removeTempFilesAndCollectSwapFiles(params)
Expand Down Expand Up @@ -141,7 +150,6 @@ object LogLoader extends Logging {
}
}


// Fourth pass: load all the log and index files.
// We might encounter legacy log segments with offset overflow (KAFKA-6264). We need to split such segments. When
// this happens, restart loading segment files from scratch.
Expand Down Expand Up @@ -200,7 +208,6 @@ object LogLoader extends Logging {
params.time,
reloadFromCleanShutdown = params.hadCleanShutdown,
params.logIdentifier)

val activeSegment = params.segments.lastSegment.get
LoadedLogOffsets(
newLogStartOffset,
Expand Down Expand Up @@ -274,16 +281,16 @@ object LogLoader extends Logging {
} catch {
case e: LogSegmentOffsetOverflowException =>
info(s"${params.logIdentifier}Caught segment overflow error: ${e.getMessage}. Split segment and retry.")
Log.splitOverflowedSegment(
val result = Log.splitOverflowedSegment(
e.segment,
params.segments,
params.dir,
params.topicPartition,
params.config,
params.scheduler,
params.logDirFailureChannel,
params.producerStateManager,
params.logIdentifier)
deleteProducerSnapshotsAsync(result.deletedSegments, params)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unnecessary since during splitting, the old segment is replaced with a new segment with the same base offset. So, result.deletedSegments is always empty.

Copy link
Copy Markdown
Contributor Author

@kowshik kowshik Jun 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. Great catch. It appears straightforward to just skip deleting the snapshot here, I can leave a comment explaining why.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@junrao I thought about this again. Correct me if I'm wrong, but it appears we may be altering existing behavior if we go down this route. Should we do it in a separate PR to isolate the change?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's fine.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have created a jira to track this improvement. https://issues.apache.org/jira/browse/KAFKA-12923

}
}
throw new IllegalStateException()
Expand Down Expand Up @@ -493,14 +500,25 @@ object LogLoader extends Logging {
Log.deleteSegmentFiles(
toDelete,
asyncDelete = true,
deleteProducerStateSnapshots = true,
params.dir,
params.topicPartition,
params.config,
params.scheduler,
params.logDirFailureChannel,
params.producerStateManager,
params.logIdentifier)
deleteProducerSnapshotsAsync(segmentsToDelete, params)
}
}

private def deleteProducerSnapshotsAsync(segments: Iterable[LogSegment],
params: LoadLogParams): Unit = {
Log.deleteProducerSnapshots(segments,
params.producerStateManager,
asyncDelete = true,
params.scheduler,
params.config,
params.logDirFailureChannel,
params.dir.getParent,
params.topicPartition)
}
}
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/log/LogManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ class LogManager(logDirs: Seq[File],
val pool = Executors.newFixedThreadPool(numRecoveryThreadsPerDataDir)
threadPools.append(pool)

val cleanShutdownFile = new File(dir, Log.CleanShutdownFile)
val cleanShutdownFile = new File(dir, LogLoader.CleanShutdownFile)
if (cleanShutdownFile.exists) {
info(s"Skipping recovery for all logs in $logDirAbsolutePath since clean shutdown file was found")
// Cache the clean shutdown status and use that for rest of log loading workflow. Delete the CleanShutdownFile
Expand Down Expand Up @@ -516,7 +516,7 @@ class LogManager(logDirs: Seq[File],

// mark that the shutdown was clean by creating marker file
debug(s"Writing clean shutdown marker at $dir")
CoreUtils.swallow(Files.createFile(new File(dir, Log.CleanShutdownFile).toPath), this)
CoreUtils.swallow(Files.createFile(new File(dir, LogLoader.CleanShutdownFile).toPath), this)
}
}
} finally {
Expand Down
11 changes: 11 additions & 0 deletions core/src/main/scala/kafka/log/LogSegment.scala
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,13 @@ class LogSegment private[log] (val log: FileRecords,
txnIndex.renameTo(new File(CoreUtils.replaceSuffix(txnIndex.file.getPath, oldSuffix, newSuffix)))
}

def hasSuffix(suffix: String): Boolean = {
log.file.getName.endsWith(suffix) &&
lazyOffsetIndex.file.getName.endsWith(suffix) &&
lazyTimeIndex.file.getName.endsWith(suffix) &&
txnIndex.file.getName.endsWith(suffix)
}

/**
* Append the largest time index entry to the time index and trim the log and indexes.
*
Expand Down Expand Up @@ -624,6 +631,10 @@ class LogSegment private[log] (val log: FileRecords,
))
}

def deleted(): Boolean = {
!log.file.exists() && !lazyOffsetIndex.file.exists() && !lazyTimeIndex.file.exists() && !txnIndex.file.exists()
}

/**
* The last modified time of this log segment as a unix time stamp
*/
Expand Down
37 changes: 37 additions & 0 deletions core/src/main/scala/kafka/log/LogSegments.scala
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,11 @@ class LogSegments(topicPartition: TopicPartition) {
@threadsafe
def firstSegment: Option[LogSegment] = firstEntry.map(_.getValue)

/**
* @return the base offset of the log segment associated with the smallest offset, if it exists
*/
private[log] def firstSegmentBaseOffset: Option[Long] = firstSegment.map(_.baseOffset)

/**
* @return the entry associated with the greatest offset, if it exists.
*/
Expand All @@ -228,4 +233,36 @@ class LogSegments(topicPartition: TopicPartition) {
}.getOrElse(collection.immutable.Map[Long, LogSegment]().asJava)
view.values.asScala
}

/**
* The active segment that is currently taking appends
*/
def activeSegment = lastSegment.get

def sizeInBytes: Long = LogSegments.sizeInBytes(values)

/**
* Returns an Iterable containing segments matching the provided predicate.
*
* @param predicate the predicate to be used for filtering segments.
*/
def filter(predicate: LogSegment => Boolean): Iterable[LogSegment] = values.filter(predicate)
}

object LogSegments {
/**
* Calculate a log's size (in bytes) from the provided log segments.
*
* @param segments The log segments to calculate the size of
* @return Sum of the log segments' sizes (in bytes)
*/
def sizeInBytes(segments: Iterable[LogSegment]): Long =
segments.map(_.size.toLong).sum

def getFirstBatchTimestampForSegments(segments: Iterable[LogSegment]): Iterable[Long] = {
segments.map {
segment =>
segment.getFirstBatchTimestamp()
}
}
}
23 changes: 8 additions & 15 deletions core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,10 @@ class PartitionLockTest extends Logging {
maxProducerIdExpirationMs,
leaderEpochCache,
producerStateManager))
new SlowLog(log, segments, offsets, leaderEpochCache, producerStateManager, mockTime, logDirFailureChannel, appendSemaphore)
val localLog = new LocalLog(log.dir, log.config, segments, offsets.recoveryPoint,
offsets.nextOffsetMetadata, mockTime.scheduler, mockTime, log.topicPartition,
logDirFailureChannel)
new SlowLog(log, offsets.logStartOffset, localLog, leaderEpochCache, producerStateManager, appendSemaphore)
}
}
when(offsetCheckpoints.fetch(ArgumentMatchers.anyString, ArgumentMatchers.eq(topicPartition)))
Expand Down Expand Up @@ -365,28 +368,18 @@ class PartitionLockTest extends Logging {

private class SlowLog(
log: Log,
segments: LogSegments,
offsets: LoadedLogOffsets,
logStartOffset: Long,
localLog: LocalLog,
leaderEpochCache: Option[LeaderEpochFileCache],
producerStateManager: ProducerStateManager,
mockTime: MockTime,
logDirFailureChannel: LogDirFailureChannel,
appendSemaphore: Semaphore
) extends Log(
log.dir,
log.config,
segments,
offsets.logStartOffset,
offsets.recoveryPoint,
offsets.nextOffsetMetadata,
mockTime.scheduler,
logStartOffset,
localLog,
new BrokerTopicStats,
mockTime,
log.producerIdExpirationCheckIntervalMs,
log.topicPartition,
leaderEpochCache,
producerStateManager,
logDirFailureChannel,
_topicId = None,
keepPartitionMetadataFile = true) {

Expand Down
23 changes: 8 additions & 15 deletions core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,10 @@ class PartitionTest extends AbstractPartitionTest {
maxProducerIdExpirationMs,
leaderEpochCache,
producerStateManager))
new SlowLog(log, segments, offsets, leaderEpochCache, producerStateManager, mockTime, logDirFailureChannel, appendSemaphore)
val localLog = new LocalLog(log.dir, log.config, segments, offsets.recoveryPoint,
offsets.nextOffsetMetadata, mockTime.scheduler, mockTime, log.topicPartition,
logDirFailureChannel)
new SlowLog(log, offsets.logStartOffset, localLog, leaderEpochCache, producerStateManager, appendSemaphore)
}
}

Expand Down Expand Up @@ -2030,28 +2033,18 @@ class PartitionTest extends AbstractPartitionTest {

private class SlowLog(
log: Log,
segments: LogSegments,
offsets: LoadedLogOffsets,
logStartOffset: Long,
localLog: LocalLog,
leaderEpochCache: Option[LeaderEpochFileCache],
producerStateManager: ProducerStateManager,
mockTime: MockTime,
logDirFailureChannel: LogDirFailureChannel,
appendSemaphore: Semaphore
) extends Log(
log.dir,
log.config,
segments,
offsets.logStartOffset,
offsets.recoveryPoint,
offsets.nextOffsetMetadata,
mockTime.scheduler,
logStartOffset,
localLog,
new BrokerTopicStats,
mockTime,
log.producerIdExpirationCheckIntervalMs,
log.topicPartition,
leaderEpochCache,
producerStateManager,
logDirFailureChannel,
_topicId = None,
keepPartitionMetadataFile = true) {

Expand Down
Loading