Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 16 additions & 16 deletions core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -243,9 +243,9 @@ class Partition(val topicPartition: TopicPartition,
// is getting changed (as a result of ReplicaAlterLogDirs command), we may have two logs until copy
// completes and a switch to new location is performed.
// log and futureLog variables defined below are used to capture this
@volatile var log: Option[Log] = None
@volatile var log: Option[UnifiedLog] = None
// If ReplicaAlterLogDir command is in progress, this is future location of the log
@volatile var futureLog: Option[Log] = None
@volatile var futureLog: Option[UnifiedLog] = None

/* Epoch of the controller that last changed the leader. This needs to be initialized correctly upon broker startup.
* One way of doing that is through the controller's start replica state change command. When a new broker starts up
Expand Down Expand Up @@ -313,10 +313,10 @@ class Partition(val topicPartition: TopicPartition,
}

def createLogIfNotExists(isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints, topicId: Option[Uuid]): Unit = {
def maybeCreate(logOpt: Option[Log]): Log = {
def maybeCreate(logOpt: Option[UnifiedLog]): UnifiedLog = {
logOpt match {
case Some(log) =>
trace(s"${if (isFutureReplica) "Future Log" else "Log"} already exists.")
trace(s"${if (isFutureReplica) "Future UnifiedLog" else "UnifiedLog"} already exists.")
if (log.topicId.isEmpty)
topicId.foreach(log.assignTopicId)
log
Expand All @@ -333,8 +333,8 @@ class Partition(val topicPartition: TopicPartition,
}

// Visible for testing
private[cluster] def createLog(isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints, topicId: Option[Uuid]): Log = {
def updateHighWatermark(log: Log) = {
private[cluster] def createLog(isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints, topicId: Option[Uuid]): UnifiedLog = {
def updateHighWatermark(log: UnifiedLog) = {
val checkpointHighWatermark = offsetCheckpoints.fetch(log.parentDir, topicPartition).getOrElse {
info(s"No checkpointed highwatermark is found for partition $topicPartition")
0L
Expand All @@ -344,7 +344,7 @@ class Partition(val topicPartition: TopicPartition,
}

logManager.initializingLog(topicPartition)
var maybeLog: Option[Log] = None
var maybeLog: Option[UnifiedLog] = None
try {
val log = logManager.getOrCreateLog(topicPartition, isNew, isFutureReplica, topicId)
maybeLog = Some(log)
Expand Down Expand Up @@ -373,7 +373,7 @@ class Partition(val topicPartition: TopicPartition,
}

private def getLocalLog(currentLeaderEpoch: Optional[Integer],
requireLeader: Boolean): Either[Log, Errors] = {
requireLeader: Boolean): Either[UnifiedLog, Errors] = {
checkCurrentLeaderEpoch(currentLeaderEpoch) match {
case Errors.NONE =>
if (requireLeader && !isLeader) {
Expand All @@ -391,17 +391,17 @@ class Partition(val topicPartition: TopicPartition,
}
}

def localLogOrException: Log = log.getOrElse {
def localLogOrException: UnifiedLog = log.getOrElse {
throw new NotLeaderOrFollowerException(s"Log for partition $topicPartition is not available " +
s"on broker $localBrokerId")
}

def futureLocalLogOrException: Log = futureLog.getOrElse {
def futureLocalLogOrException: UnifiedLog = futureLog.getOrElse {
throw new NotLeaderOrFollowerException(s"Future log for partition $topicPartition is not available " +
s"on broker $localBrokerId")
}

def leaderLogIfLocal: Option[Log] = {
def leaderLogIfLocal: Option[UnifiedLog] = {
log.filter(_ => isLeader)
}

Expand All @@ -411,7 +411,7 @@ class Partition(val topicPartition: TopicPartition,
def isLeader: Boolean = leaderReplicaIdOpt.contains(localBrokerId)

private def localLogWithEpochOrException(currentLeaderEpoch: Optional[Integer],
requireLeader: Boolean): Log = {
requireLeader: Boolean): UnifiedLog = {
getLocalLog(currentLeaderEpoch, requireLeader) match {
case Left(localLog) => localLog
case Right(error) =>
Expand All @@ -422,7 +422,7 @@ class Partition(val topicPartition: TopicPartition,
}

// Visible for testing -- Used by unit tests to set log for this partition
def setLog(log: Log, isFutureLog: Boolean): Unit = {
def setLog(log: UnifiedLog, isFutureLog: Boolean): Unit = {
if (isFutureLog)
futureLog = Some(log)
else
Expand Down Expand Up @@ -576,9 +576,9 @@ class Partition(val topicPartition: TopicPartition,
remoteReplicas.foreach { replica =>
replica.updateFetchState(
followerFetchOffsetMetadata = LogOffsetMetadata.UnknownOffsetMetadata,
followerStartOffset = Log.UnknownOffset,
followerStartOffset = UnifiedLog.UnknownOffset,
followerFetchTimeMs = 0L,
leaderEndOffset = Log.UnknownOffset)
leaderEndOffset = UnifiedLog.UnknownOffset)
}
}
// we may need to increment high watermark since ISR could be down to 1
Expand Down Expand Up @@ -843,7 +843,7 @@ class Partition(val topicPartition: TopicPartition,
*
* @return true if the HW was incremented, and false otherwise.
*/
private def maybeIncrementLeaderHW(leaderLog: Log, curTime: Long = time.milliseconds): Boolean = {
private def maybeIncrementLeaderHW(leaderLog: UnifiedLog, curTime: Long = time.milliseconds): Boolean = {
// maybeIncrementLeaderHW is in the hot path, the following code is written to
// avoid unnecessary collection generation
var newHighWatermark = leaderLog.logEndOffsetMetadata
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/cluster/Replica.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package kafka.cluster

import kafka.log.Log
import kafka.log.UnifiedLog
import kafka.server.LogOffsetMetadata
import kafka.utils.Logging
import org.apache.kafka.common.TopicPartition
Expand All @@ -28,7 +28,7 @@ class Replica(val brokerId: Int, val topicPartition: TopicPartition) extends Log
@volatile private[this] var _logEndOffsetMetadata = LogOffsetMetadata.UnknownOffsetMetadata
// the log start offset value, kept in all replicas;
// for local replica it is the log's start offset, for remote replicas its value is only updated by follower fetch
@volatile private[this] var _logStartOffset = Log.UnknownOffset
@volatile private[this] var _logStartOffset = UnifiedLog.UnknownOffset

// The log end offset value at the time the leader received the last FetchRequest from this follower
// This is used to determine the lastCaughtUpTimeMs of the follower
Expand Down
14 changes: 7 additions & 7 deletions core/src/main/scala/kafka/log/LogCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ import scala.util.control.ControlThrowable
*/
class LogCleaner(initialConfig: CleanerConfig,
val logDirs: Seq[File],
val logs: Pool[TopicPartition, Log],
val logs: Pool[TopicPartition, UnifiedLog],
val logDirFailureChannel: LogDirFailureChannel,
time: Time = Time.SYSTEM) extends Logging with KafkaMetricsGroup with BrokerReconfigurable
{
Expand Down Expand Up @@ -272,7 +272,7 @@ class LogCleaner(initialConfig: CleanerConfig,
* retention threads need to make this call to obtain:
* @return A list of log partitions that retention threads can safely work on
*/
def pauseCleaningForNonCompactedPartitions(): Iterable[(TopicPartition, Log)] = {
def pauseCleaningForNonCompactedPartitions(): Iterable[(TopicPartition, UnifiedLog)] = {
cleanerManager.pauseCleaningForNonCompactedPartitions()
}

Expand Down Expand Up @@ -356,7 +356,7 @@ class LogCleaner(initialConfig: CleanerConfig,
case e: Exception => throw new LogCleaningException(cleanable.log, e.getMessage, e)
}
}
val deletable: Iterable[(TopicPartition, Log)] = cleanerManager.deletableLogs()
val deletable: Iterable[(TopicPartition, UnifiedLog)] = cleanerManager.deletableLogs()
try {
deletable.foreach { case (_, log) =>
try {
Expand Down Expand Up @@ -549,14 +549,14 @@ private[log] class Cleaner(val id: Int,
* @param transactionMetadata State of ongoing transactions which is carried between the cleaning
* of the grouped segments
*/
private[log] def cleanSegments(log: Log,
private[log] def cleanSegments(log: UnifiedLog,
segments: Seq[LogSegment],
map: OffsetMap,
deleteHorizonMs: Long,
stats: CleanerStats,
transactionMetadata: CleanedTransactionMetadata): Unit = {
// create a new segment with a suffix appended to the name of the log and indexes
val cleaned = Log.createNewCleanedSegment(log.dir, log.config, segments.head.baseOffset)
val cleaned = UnifiedLog.createNewCleanedSegment(log.dir, log.config, segments.head.baseOffset)
transactionMetadata.cleanedIndex = Some(cleaned.txnIndex)

try {
Expand Down Expand Up @@ -876,7 +876,7 @@ private[log] class Cleaner(val id: Int,
* @param map The map in which to store the mappings
* @param stats Collector for cleaning statistics
*/
private[log] def buildOffsetMap(log: Log,
private[log] def buildOffsetMap(log: UnifiedLog,
start: Long,
end: Long,
map: OffsetMap,
Expand Down Expand Up @@ -1063,7 +1063,7 @@ private class CleanerStats(time: Time = Time.SYSTEM) {
* and whether it needs compaction immediately.
*/
private case class LogToClean(topicPartition: TopicPartition,
log: Log,
log: UnifiedLog,
firstDirtyOffset: Long,
uncleanableOffset: Long,
needCompactionNow: Boolean = false) extends Ordered[LogToClean] {
Expand Down
18 changes: 9 additions & 9 deletions core/src/main/scala/kafka/log/LogCleanerManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ private[log] case object LogCleaningInProgress extends LogCleaningState
private[log] case object LogCleaningAborted extends LogCleaningState
private[log] case class LogCleaningPaused(pausedCount: Int) extends LogCleaningState

private[log] class LogCleaningException(val log: Log,
private[log] class LogCleaningException(val log: UnifiedLog,
private val message: String,
private val cause: Throwable) extends KafkaException(message, cause)

Expand All @@ -59,7 +59,7 @@ private[log] class LogCleaningException(val log: Log,
* Valid previous state is LogCleaningPaused(i-1) or LogCleaningPaused(i+1).
*/
private[log] class LogCleanerManager(val logDirs: Seq[File],
val logs: Pool[TopicPartition, Log],
val logs: Pool[TopicPartition, UnifiedLog],
val logDirFailureChannel: LogDirFailureChannel) extends Logging with KafkaMetricsGroup {
import LogCleanerManager._

Expand Down Expand Up @@ -216,7 +216,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
* switch topic configuration between compacted and non-compacted topic.
* @return retention logs that have log cleaning successfully paused
*/
def pauseCleaningForNonCompactedPartitions(): Iterable[(TopicPartition, Log)] = {
def pauseCleaningForNonCompactedPartitions(): Iterable[(TopicPartition, UnifiedLog)] = {
inLock(lock) {
val deletableLogs = logs.filter {
case (_, log) => !log.config.compact // pick non-compacted logs
Expand All @@ -236,7 +236,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
* Include logs without delete enabled, as they may have segments
* that precede the start offset.
*/
def deletableLogs(): Iterable[(TopicPartition, Log)] = {
def deletableLogs(): Iterable[(TopicPartition, UnifiedLog)] = {
inLock(lock) {
val toClean = logs.filter { case (topicPartition, log) =>
!inProgress.contains(topicPartition) && log.config.compact &&
Expand Down Expand Up @@ -506,7 +506,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
}
}

private def isUncleanablePartition(log: Log, topicPartition: TopicPartition): Boolean = {
private def isUncleanablePartition(log: UnifiedLog, topicPartition: TopicPartition): Boolean = {
inLock(lock) {
uncleanablePartitions.get(log.parentDir).exists(partitions => partitions.contains(topicPartition))
}
Expand All @@ -529,15 +529,15 @@ private case class OffsetsToClean(firstDirtyOffset: Long,

private[log] object LogCleanerManager extends Logging {

def isCompactAndDelete(log: Log): Boolean = {
def isCompactAndDelete(log: UnifiedLog): Boolean = {
log.config.compact && log.config.delete
}

/**
* get max delay between the time when log is required to be compacted as determined
* by maxCompactionLagMs and the current time.
*/
def maxCompactionDelay(log: Log, firstDirtyOffset: Long, now: Long) : Long = {
def maxCompactionDelay(log: UnifiedLog, firstDirtyOffset: Long, now: Long) : Long = {
val dirtyNonActiveSegments = log.nonActiveLogSegmentsFrom(firstDirtyOffset)
val firstBatchTimestamps = log.getFirstBatchTimestampForSegments(dirtyNonActiveSegments).filter(_ > 0)

Expand All @@ -564,7 +564,7 @@ private[log] object LogCleanerManager extends Logging {
* @param now the current time in milliseconds of the cleaning operation
* @return OffsetsToClean containing offsets for cleanable portion of log and whether the log checkpoint needs updating
*/
def cleanableOffsets(log: Log, lastCleanOffset: Option[Long], now: Long): OffsetsToClean = {
def cleanableOffsets(log: UnifiedLog, lastCleanOffset: Option[Long], now: Long): OffsetsToClean = {
// If the log segments are abnormally truncated and hence the checkpointed offset is no longer valid;
// reset to the log starting offset and log the error
val (firstDirtyOffset, forceUpdateCheckpoint) = {
Expand Down Expand Up @@ -626,7 +626,7 @@ private[log] object LogCleanerManager extends Logging {
* Given the first dirty offset and an uncleanable offset, calculates the total cleanable bytes for this log
* @return the biggest uncleanable offset and the total amount of cleanable bytes
*/
def calculateCleanableBytes(log: Log, firstDirtyOffset: Long, uncleanableOffset: Long): (Long, Long) = {
def calculateCleanableBytes(log: UnifiedLog, firstDirtyOffset: Long, uncleanableOffset: Long): (Long, Long) = {
val firstUncleanableSegment = log.nonActiveLogSegmentsFrom(uncleanableOffset).headOption.getOrElse(log.activeSegment)
val firstUncleanableOffset = firstUncleanableSegment.baseOffset
val cleanableBytes = log.logSegments(math.min(firstDirtyOffset, firstUncleanableOffset), firstUncleanableOffset).map(_.size.toLong).sum
Expand Down
Loading