Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -314,9 +314,9 @@
<suppress checks="(CyclomaticComplexity|NPathComplexity)"
files="(PartitionRegistration|PartitionChangeBuilder).java"/>
<suppress checks="CyclomaticComplexity"
files="(ClientQuotasImage|KafkaEventQueue|MetadataDelta|QuorumController|ReplicationControlManager|KRaftMigrationDriver|ClusterControlManager).java"/>
files="(ClientQuotasImage|KafkaEventQueue|MetadataDelta|QuorumController|ReplicationControlManager|KRaftMigrationDriver|ClusterControlManager|MetaPropertiesEnsemble).java"/>
<suppress checks="NPathComplexity"
files="(ClientQuotasImage|KafkaEventQueue|ReplicationControlManager|FeatureControlManager|KRaftMigrationDriver|ScramControlManager|ClusterControlManager|MetadataDelta).java"/>
files="(ClientQuotasImage|KafkaEventQueue|ReplicationControlManager|FeatureControlManager|KRaftMigrationDriver|ScramControlManager|ClusterControlManager|MetadataDelta|MetaPropertiesEnsemble).java"/>
<suppress checks="(NPathComplexity|ClassFanOutComplexity|CyclomaticComplexity|ClassDataAbstractionCoupling|LocalVariableName|MemberName|ParameterName|MethodLength|JavaNCSS|AvoidStarImport)"
files="metadata[\\/]src[\\/](generated|generated-test)[\\/].+.java$"/>
<suppress checks="BooleanExpressionComplexity"
Expand Down
47 changes: 21 additions & 26 deletions core/src/main/scala/kafka/log/LogManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@
package kafka.log

import java.io._
import java.nio.file.Files
import java.nio.file.{Files, NoSuchFileException}
import java.util.concurrent._
import java.util.concurrent.atomic.AtomicInteger
import kafka.server.checkpoints.OffsetCheckpointFile
import kafka.server.metadata.ConfigRepository
import kafka.server._
import kafka.utils._
import org.apache.kafka.common.{DirectoryId, KafkaException, TopicPartition, Uuid}
import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
import org.apache.kafka.common.utils.{KafkaThread, Time, Utils}
import org.apache.kafka.common.errors.{InconsistentTopicIdException, KafkaStorageException, LogDirNotFoundException}

Expand All @@ -35,6 +35,7 @@ import scala.collection.mutable.ArrayBuffer
import scala.util.{Failure, Success, Try}
import kafka.utils.Implicits._
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble, PropertiesUtils}

import java.util.{OptionalLong, Properties}
import org.apache.kafka.server.common.MetadataVersion
Expand Down Expand Up @@ -122,9 +123,7 @@ class LogManager(logDirs: Seq[File],
}

private val dirLocks = lockLogDirs(liveLogDirs)
private val dirIds = directoryIds(liveLogDirs)
// visible for testing
private[log] val directoryIds: Set[Uuid] = dirIds.values.toSet
val directoryIds: Map[String, Uuid] = loadDirectoryIds(liveLogDirs)
@volatile private var recoveryPointCheckpoints = liveLogDirs.map(dir =>
(dir, new OffsetCheckpointFile(new File(dir, RecoveryPointCheckpointFile), logDirFailureChannel))).toMap
@volatile private var logStartOffsetCheckpoints = liveLogDirs.map(dir =>
Expand Down Expand Up @@ -272,35 +271,31 @@ class LogManager(logDirs: Seq[File],
/**
* Retrieves the Uuid for the directory, given its absolute path.
*/
def directoryId(dir: String): Option[Uuid] = dirIds.get(dir)
def directoryId(dir: String): Option[Uuid] = directoryIds.get(dir)

/**
* Determine directory ID for each directory with a meta.properties.
* If meta.properties does not include a directory ID, one is generated and persisted back to meta.properties.
* Directories without a meta.properties don't get a directory ID assigned.
*/
private def directoryIds(dirs: Seq[File]): Map[String, Uuid] = {
dirs.flatMap { dir =>
private def loadDirectoryIds(logDirs: Seq[File]): Map[String, Uuid] = {
val result = mutable.HashMap[String, Uuid]()
logDirs.foreach(logDir => {
try {
val metadataCheckpoint = new BrokerMetadataCheckpoint(new File(dir, KafkaServer.brokerMetaPropsFile))
metadataCheckpoint.read().map { props =>
val rawMetaProperties = new RawMetaProperties(props)
val uuid = rawMetaProperties.directoryId match {
case Some(uuidStr) => Uuid.fromString(uuidStr)
case None =>
val uuid = DirectoryId.random()
rawMetaProperties.directoryId = uuid.toString
metadataCheckpoint.write(rawMetaProperties.props)
uuid
}
dir.getAbsolutePath -> uuid
}.toMap
val props = PropertiesUtils.readPropertiesFile(
new File(logDir, MetaPropertiesEnsemble.META_PROPERTIES_NAME).getAbsolutePath)
val metaProps = new MetaProperties.Builder(props).build()
metaProps.directoryId().ifPresent(directoryId => {
result += (logDir.getAbsolutePath -> directoryId)
})
} catch {
case e: IOException =>
logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath, s"Disk error while loading ID $dir", e)
None
}
}.toMap
case e: NoSuchFileException =>
info(s"No meta.properties file found in ${logDir}.")
case e: IOException =>
logDirFailureChannel.maybeAddOfflineLogDir(logDir.getAbsolutePath, s"Disk error while loading ID $logDir", e)
}
})
result
}

private def addLogToBeDeleted(log: UnifiedLog): Unit = {
Expand Down
273 changes: 0 additions & 273 deletions core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala

This file was deleted.

Loading