Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,5 @@ systest/
clients/src/generated
clients/src/generated-test
jmh-benchmarks/generated
jmh-benchmarks/src/main/generated
streams/src/generated
6 changes: 5 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -1540,10 +1540,14 @@ project(':jmh-benchmarks') {
compile project(':core')
compile project(':clients')
compile project(':streams')
compile project(':core')
compile project(':clients').sourceSets.test.output
compile project(':core').sourceSets.test.output
compile libs.jmhCore
compile libs.mockitoCore
annotationProcessor libs.jmhGeneratorAnnProcess
compile libs.jmhCoreBenchmarks
compile libs.mockitoCore
compile libs.slf4jlog4j
}

jar {
Expand Down
1 change: 1 addition & 0 deletions checkstyle/import-control-jmh-benchmarks.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
<allow pkg="kafka.controller"/>
<allow pkg="kafka.coordinator"/>
<allow pkg="kafka.network"/>
<allow pkg="kafka.utils"/>
<allow pkg="kafka.zk"/>
<allow class="kafka.utils.Pool"/>
<allow class="kafka.utils.KafkaScheduler"/>
Expand Down
11 changes: 0 additions & 11 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -284,17 +284,6 @@
</subpackage>
</subpackage>

<subpackage name="jmh">
<allow pkg="org.openjdk.jmh.annotations" />
<allow pkg="org.openjdk.jmh.runner" />
<allow pkg="org.openjdk.jmh.runner.options" />
<allow pkg="org.openjdk.jmh.infra" />
<allow pkg="org.apache.kafka.common" />
<allow pkg="org.apache.kafka.clients" />
<allow pkg="org.apache.kafka.streams" />
<allow pkg="org.github.jamm" />
</subpackage>

<subpackage name="log4jappender">
<allow pkg="org.apache.log4j" />
<allow pkg="org.apache.kafka.clients" />
Expand Down
38 changes: 17 additions & 21 deletions core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package kafka.cluster
import java.util.concurrent.locks.ReentrantReadWriteLock
import java.util.{Optional, Properties}

import kafka.api.{ApiVersion, LeaderAndIsr, Request}
import kafka.api.{ApiVersion, LeaderAndIsr}
import kafka.common.UnexpectedAppendOffsetException
import kafka.controller.KafkaController
import kafka.log._
Expand Down Expand Up @@ -266,51 +266,51 @@ class Partition(val topicPartition: TopicPartition,
// current replica and the existence of the future replica, no other thread can update the log directory of the
// current replica or remove the future replica.
inWriteLock(leaderIsrUpdateLock) {
val currentLogDir = localLogOrException.dir.getParent
val currentLogDir = localLogOrException.parentDir
if (currentLogDir == logDir) {
info(s"Current log directory $currentLogDir is same as requested log dir $logDir. " +
s"Skipping future replica creation.")
false
} else {
futureLog match {
case Some(partitionFutureLog) =>
val futureLogDir = partitionFutureLog.dir.getParent
val futureLogDir = partitionFutureLog.parentDir
if (futureLogDir != logDir)
throw new IllegalStateException(s"The future log dir $futureLogDir of $topicPartition is " +
s"different from the requested log dir $logDir")
false
case None =>
createLogIfNotExists(Request.FutureLocalReplicaId, isNew = false, isFutureReplica = true, highWatermarkCheckpoints)
createLogIfNotExists(isNew = false, isFutureReplica = true, highWatermarkCheckpoints)
true
}
}
}
}

def createLogIfNotExists(replicaId: Int, isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints): Unit = {
def createLogIfNotExists(isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints): Unit = {
isFutureReplica match {
case true if futureLog.isEmpty =>
val log = createLog(replicaId, isNew, isFutureReplica, offsetCheckpoints)
val log = createLog(isNew, isFutureReplica, offsetCheckpoints)
this.futureLog = Option(log)
case false if log.isEmpty =>
val log = createLog(replicaId, isNew, isFutureReplica, offsetCheckpoints)
val log = createLog(isNew, isFutureReplica, offsetCheckpoints)
this.log = Option(log)
case _ => trace(s"${if (isFutureReplica) "Future Log" else "Log"} already exists.")
}
}

// Visible for testing
private[cluster] def createLog(replicaId: Int, isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints): Log = {
val fetchLogConfig = () => {
private[cluster] def createLog(isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints): Log = {
def fetchLogConfig: LogConfig = {
val props = stateStore.fetchTopicConfig()
LogConfig.fromProps(logManager.currentDefaultConfig.originals, props)
}

logManager.initializingLog(topicPartition)
var maybeLog: Option[Log] = None
try {
val log = logManager.getOrCreateLog(topicPartition, fetchLogConfig(), isNew, isFutureReplica)
val checkpointHighWatermark = offsetCheckpoints.fetch(log.dir.getParent, topicPartition).getOrElse {
val log = logManager.getOrCreateLog(topicPartition, fetchLogConfig, isNew, isFutureReplica)
val checkpointHighWatermark = offsetCheckpoints.fetch(log.parentDir, topicPartition).getOrElse {
info(s"No checkpointed highwatermark is found for partition $topicPartition")
0L
}
Expand All @@ -319,7 +319,7 @@ class Partition(val topicPartition: TopicPartition,
maybeLog = Some(log)
log
} finally {
logManager.finishedInitializingLog(topicPartition, maybeLog, fetchLogConfig)
logManager.finishedInitializingLog(topicPartition, maybeLog, () => fetchLogConfig)
}
}

Expand Down Expand Up @@ -410,7 +410,7 @@ class Partition(val topicPartition: TopicPartition,

def futureReplicaDirChanged(newDestinationDir: String): Boolean = {
inReadLock(leaderIsrUpdateLock) {
futureLog.exists(_.dir.getParent != newDestinationDir)
futureLog.exists(_.parentDir != newDestinationDir)
}
}

Expand Down Expand Up @@ -478,9 +478,7 @@ class Partition(val topicPartition: TopicPartition,
* from the time when this broker was the leader last time) and setting the new leader and ISR.
* If the leader replica id does not change, return false to indicate the replica manager.
*/
def makeLeader(controllerId: Int,
partitionState: LeaderAndIsrPartitionState,
correlationId: Int,
def makeLeader(partitionState: LeaderAndIsrPartitionState,
highWatermarkCheckpoints: OffsetCheckpoints): Boolean = {
val (leaderHWIncremented, isNewLeader) = inWriteLock(leaderIsrUpdateLock) {
// record the epoch of the controller that made the leadership decision. This is useful while updating the isr
Expand All @@ -493,7 +491,7 @@ class Partition(val topicPartition: TopicPartition,
addingReplicas = partitionState.addingReplicas.asScala.map(_.toInt),
removingReplicas = partitionState.removingReplicas.asScala.map(_.toInt)
)
createLogIfNotExists(localBrokerId, partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints)
createLogIfNotExists(partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints)

val leaderLog = localLogOrException
val leaderEpochStartOffset = leaderLog.logEndOffset
Expand Down Expand Up @@ -549,9 +547,7 @@ class Partition(val topicPartition: TopicPartition,
* greater (that is, no updates have been missed), return false to indicate to the
* replica manager that state is already correct and the become-follower steps can be skipped
*/
def makeFollower(controllerId: Int,
partitionState: LeaderAndIsrPartitionState,
correlationId: Int,
def makeFollower(partitionState: LeaderAndIsrPartitionState,
highWatermarkCheckpoints: OffsetCheckpoints): Boolean = {
inWriteLock(leaderIsrUpdateLock) {
val newLeaderBrokerId = partitionState.leader
Expand All @@ -566,7 +562,7 @@ class Partition(val topicPartition: TopicPartition,
addingReplicas = partitionState.addingReplicas.asScala.map(_.toInt),
removingReplicas = partitionState.removingReplicas.asScala.map(_.toInt)
)
createLogIfNotExists(localBrokerId, partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints)
createLogIfNotExists(partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints)

leaderEpoch = partitionState.leaderEpoch
leaderEpochStartOffsetOpt = None
Expand Down
63 changes: 37 additions & 26 deletions core/src/main/scala/kafka/log/Log.scala
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ object RollParams {
* New log segments are created according to a configurable policy that controls the size in bytes or time interval
* for a given segment.
*
* @param dir The directory in which log segments are created.
* @param _dir The directory in which log segments are created.
* @param config The log configuration settings
* @param logStartOffset The earliest offset allowed to be exposed to kafka client.
* The logStartOffset can be updated by :
Expand All @@ -209,7 +209,7 @@ object RollParams {
* @param producerIdExpirationCheckIntervalMs How often to check for producer ids which need to be expired
*/
@threadsafe
class Log(@volatile var dir: File,
class Log(@volatile private var _dir: File,
@volatile var config: LogConfig,
@volatile var logStartOffset: Long,
@volatile var recoveryPoint: Long,
Expand All @@ -228,36 +228,17 @@ class Log(@volatile var dir: File,

/* A lock that guards all modifications to the log */
private val lock = new Object

// The memory mapped buffer for index files of this log will be closed with either delete() or closeHandlers()
// After memory mapped buffer is closed, no disk IO operation should be performed for this log
@volatile private var isMemoryMappedBufferClosed = false

// Cache value of parent directory to avoid allocations in hot paths like ReplicaManager.checkpointHighWatermarks
@volatile private var _parentDir: String = dir.getParent

/* last time it was flushed */
private val lastFlushedTime = new AtomicLong(time.milliseconds)

def initFileSize: Int = {
if (config.preallocate)
config.segmentSize
else
0
}

def updateConfig(newConfig: LogConfig): Unit = {
val oldConfig = this.config
this.config = newConfig
val oldRecordVersion = oldConfig.messageFormatVersion.recordVersion
val newRecordVersion = newConfig.messageFormatVersion.recordVersion
if (newRecordVersion.precedes(oldRecordVersion))
warn(s"Record format version has been downgraded from $oldRecordVersion to $newRecordVersion.")
if (newRecordVersion.value != oldRecordVersion.value)
initializeLeaderEpochCache()
}

private def checkIfMemoryMappedBufferClosed(): Unit = {
if (isMemoryMappedBufferClosed)
throw new KafkaStorageException(s"The memory mapped buffer for log of $topicPartition is already closed")
}

@volatile private var nextOffsetMetadata: LogOffsetMetadata = _

/* The earliest offset which is part of an incomplete transaction. This is used to compute the
Expand Down Expand Up @@ -316,6 +297,35 @@ class Log(@volatile var dir: File,
s"log end offset $logEndOffset in ${time.milliseconds() - startMs} ms")
}

def dir: File = _dir

def parentDir: String = _parentDir

def parentDirFile: File = new File(_parentDir)

def initFileSize: Int = {
if (config.preallocate)
config.segmentSize
else
0
}

def updateConfig(newConfig: LogConfig): Unit = {
val oldConfig = this.config
this.config = newConfig
val oldRecordVersion = oldConfig.messageFormatVersion.recordVersion
val newRecordVersion = newConfig.messageFormatVersion.recordVersion
if (newRecordVersion.precedes(oldRecordVersion))
warn(s"Record format version has been downgraded from $oldRecordVersion to $newRecordVersion.")
if (newRecordVersion.value != oldRecordVersion.value)
initializeLeaderEpochCache()
}

private def checkIfMemoryMappedBufferClosed(): Unit = {
if (isMemoryMappedBufferClosed)
throw new KafkaStorageException(s"The memory mapped buffer for log of $topicPartition is already closed")
}

def highWatermark: Long = highWatermarkMetadata.messageOffset

/**
Expand Down Expand Up @@ -961,7 +971,8 @@ class Log(@volatile var dir: File,
val renamedDir = new File(dir.getParent, name)
Utils.atomicMoveWithFallback(dir.toPath, renamedDir.toPath)
if (renamedDir != dir) {
dir = renamedDir
_dir = renamedDir
_parentDir = renamedDir.getParent
logSegments.foreach(_.updateDir(renamedDir))
producerStateManager.logDir = dir
// re-initialize leader epoch cache so that LeaderEpochCheckpointFile.checkpoint can correctly reference
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/log/LogCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ class LogCleaner(initialConfig: CleanerConfig,
} catch {
case e: LogCleaningException =>
warn(s"Unexpected exception thrown when cleaning log ${e.log}. Marking its partition (${e.log.topicPartition}) as uncleanable", e)
cleanerManager.markPartitionUncleanable(e.log.dir.getParent, e.log.topicPartition)
cleanerManager.markPartitionUncleanable(e.log.parentDir, e.log.topicPartition)

false
}
Expand Down Expand Up @@ -365,11 +365,11 @@ class LogCleaner(initialConfig: CleanerConfig,
case _: LogCleaningAbortedException => // task can be aborted, let it go.
case _: KafkaStorageException => // partition is already offline. let it go.
case e: IOException =>
val logDirectory = cleanable.log.dir.getParent
val logDirectory = cleanable.log.parentDir
val msg = s"Failed to clean up log for ${cleanable.topicPartition} in dir ${logDirectory} due to IOException"
logDirFailureChannel.maybeAddOfflineLogDir(logDirectory, msg, e)
} finally {
cleanerManager.doneCleaning(cleanable.topicPartition, cleanable.log.dir.getParentFile, endOffset)
cleanerManager.doneCleaning(cleanable.topicPartition, cleanable.log.parentDirFile, endOffset)
}
}

Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/log/LogCleanerManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
val offsetsToClean = cleanableOffsets(log, lastCleanOffset, now)
// update checkpoint for logs with invalid checkpointed offsets
if (offsetsToClean.forceUpdateCheckpoint)
updateCheckpoints(log.dir.getParentFile(), Option(topicPartition, offsetsToClean.firstDirtyOffset))
updateCheckpoints(log.parentDirFile, Option(topicPartition, offsetsToClean.firstDirtyOffset))
val compactionDelayMs = maxCompactionDelay(log, offsetsToClean.firstDirtyOffset, now)
preCleanStats.updateMaxCompactionDelay(compactionDelayMs)

Expand Down Expand Up @@ -379,7 +379,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
case Some(offset) =>
// Remove this partition from the checkpoint file in the source log directory
updateCheckpoints(sourceLogDir, None)
// Add offset for this partition to the checkpoint file in the source log directory
// Add offset for this partition to the checkpoint file in the destination log directory
updateCheckpoints(destLogDir, Option(topicPartition, offset))
case None =>
}
Expand Down Expand Up @@ -478,7 +478,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],

private def isUncleanablePartition(log: Log, topicPartition: TopicPartition): Boolean = {
inLock(lock) {
uncleanablePartitions.get(log.dir.getParent).exists(partitions => partitions.contains(topicPartition))
uncleanablePartitions.get(log.parentDir).exists(partitions => partitions.contains(topicPartition))
}
}
}
Expand Down
Loading