diff --git a/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala b/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala index dfe29d1ca35ec..c5ba26fdd970d 100755 --- a/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala +++ b/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala @@ -157,7 +157,8 @@ case class MetaProperties( object BrokerMetadataCheckpoint extends Logging { def getBrokerMetadataAndOfflineDirs( logDirs: collection.Seq[String], - ignoreMissing: Boolean + ignoreMissing: Boolean, + kraftMode: Boolean ): (RawMetaProperties, collection.Seq[String]) = { require(logDirs.nonEmpty, "Must have at least one log dir to read meta.properties") @@ -188,8 +189,13 @@ object BrokerMetadataCheckpoint extends Logging { if (brokerMetadataMap.isEmpty) { (new RawMetaProperties(), offlineDirs) } else { - val parsedProperties = brokerMetadataMap.values.map(props => MetaProperties.parse(new RawMetaProperties(props))) - val numDistinctMetaProperties = parsedProperties.toSet.size + // KRaft mode has to support handling both meta.properties versions 0 and 1 and has to + // reconcile have multiple versions in different directories. + val numDistinctMetaProperties = if (kraftMode) { + brokerMetadataMap.values.map(props => MetaProperties.parse(new RawMetaProperties(props))).toSet.size + } else { + brokerMetadataMap.values.toSet.size + } if (numDistinctMetaProperties > 1) { val builder = new StringBuilder diff --git a/core/src/main/scala/kafka/server/KafkaRaftServer.scala b/core/src/main/scala/kafka/server/KafkaRaftServer.scala index f5f2c12197531..b10f48ba77498 100644 --- a/core/src/main/scala/kafka/server/KafkaRaftServer.scala +++ b/core/src/main/scala/kafka/server/KafkaRaftServer.scala @@ -143,7 +143,7 @@ object KafkaRaftServer { def initializeLogDirs(config: KafkaConfig): (MetaProperties, BootstrapMetadata, Seq[String]) = { val logDirs = (config.logDirs.toSet + config.metadataLogDir).toSeq val (rawMetaProperties, offlineDirs) = BrokerMetadataCheckpoint. - getBrokerMetadataAndOfflineDirs(logDirs, ignoreMissing = false) + getBrokerMetadataAndOfflineDirs(logDirs, ignoreMissing = false, kraftMode = true) if (offlineDirs.contains(config.metadataLogDir)) { throw new KafkaException("Cannot start server since `meta.properties` could not be " + diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 30a96537cc1b6..ea1f6b05956a2 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -224,7 +224,7 @@ class KafkaServer( /* load metadata */ val (preloadedBrokerMetadataCheckpoint, initialOfflineDirs) = - BrokerMetadataCheckpoint.getBrokerMetadataAndOfflineDirs(config.logDirs, ignoreMissing = true) + BrokerMetadataCheckpoint.getBrokerMetadataAndOfflineDirs(config.logDirs, ignoreMissing = true, kraftMode = false) if (preloadedBrokerMetadataCheckpoint.version != 0) { throw new RuntimeException(s"Found unexpected version in loaded `meta.properties`: " + diff --git a/core/src/test/scala/kafka/server/BrokerMetadataCheckpointTest.scala b/core/src/test/scala/kafka/server/BrokerMetadataCheckpointTest.scala index c7ce0ac19cb8b..f77ca6c1b6554 100644 --- a/core/src/test/scala/kafka/server/BrokerMetadataCheckpointTest.scala +++ b/core/src/test/scala/kafka/server/BrokerMetadataCheckpointTest.scala @@ -12,15 +12,18 @@ */ package kafka.server -import java.io.File -import java.util.Properties +import kafka.utils.{CoreUtils, Logging} +import java.io.{File, FileOutputStream} +import java.util.Properties import org.apache.kafka.common.utils.Utils import org.apache.kafka.test.TestUtils import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Test -class BrokerMetadataCheckpointTest { +import scala.collection.mutable + +class BrokerMetadataCheckpointTest extends Logging { private val clusterIdBase64 = "H3KKO4NTRPaCWtEmm3vW7A" @Test @@ -112,16 +115,85 @@ class BrokerMetadataCheckpointTest { assertThrows(classOf[RuntimeException], () => MetaProperties.parse(properties)) } + @Test + def testMetaPropertiesWithVersionZero(): Unit = { + val properties = new RawMetaProperties() + properties.version = 0 + properties.clusterId = clusterIdBase64 + properties.brokerId = 5 + val metaProps = MetaProperties.parse(properties) + assertEquals(clusterIdBase64, metaProps.clusterId) + assertEquals(5, metaProps.nodeId) + } + + @Test + def testValidMetaPropertiesWithMultipleVersionsInLogDirs(): Unit = { + // Let's create two directories with meta.properties one in v0 and v1. + val props1 = new RawMetaProperties() + props1.version = 0 + props1.clusterId = clusterIdBase64 + props1.brokerId = 5 + val props2 = new RawMetaProperties() + props2.version = 1 + props2.clusterId = clusterIdBase64 + props2.nodeId = 5 + for (ignoreMissing <- Seq(true, false)) { + val (metaProps, offlineDirs) = getMetadataWithMultipleMetaPropLogDirs(Seq(props1, props2), ignoreMissing, kraftMode = true) + assertEquals(MetaProperties.parse(props2), MetaProperties.parse(metaProps)) + assertEquals(Seq.empty, offlineDirs) + } + } + + @Test + def testInvalidMetaPropertiesWithMultipleVersionsInLogDirs(): Unit = { + // Let's create two directories with meta.properties one in v0 and v1. + val props1 = new RawMetaProperties() + props1.version = 0 + props1.brokerId = 5 + val props2 = new RawMetaProperties() + props2.version = 1 + props2.clusterId = clusterIdBase64 + props2.nodeId = 5 + for (ignoreMissing <- Seq(true, false)) { + assertThrows(classOf[RuntimeException], + () => getMetadataWithMultipleMetaPropLogDirs(Seq(props1, props2), ignoreMissing, kraftMode = true)) + } + } + + private def getMetadataWithMultipleMetaPropLogDirs(metaProperties: Seq[RawMetaProperties], + ignoreMissing: Boolean, + kraftMode: Boolean): (RawMetaProperties, collection.Seq[String]) = { + val logDirs = mutable.Buffer[File]() + try { + for (mp <- metaProperties) { + val logDir = TestUtils.tempDirectory() + logDirs += logDir + val propFile = new File(logDir.getAbsolutePath, "meta.properties") + val fs = new FileOutputStream(propFile) + try { + mp.props.store(fs, "") + fs.flush() + fs.getFD.sync() + } finally { + Utils.closeQuietly(fs, propFile.getName) + } + } + BrokerMetadataCheckpoint.getBrokerMetadataAndOfflineDirs(logDirs.map(_.getAbsolutePath), ignoreMissing, kraftMode) + } finally { + logDirs.foreach(logDir => CoreUtils.swallow(Utils.delete(logDir), this)) + } + } + @Test def testGetBrokerMetadataAndOfflineDirsWithNonexistentDirectories(): Unit = { // Use a regular file as an invalid log dir to trigger an IO error val invalidDir = TestUtils.tempFile("blah") try { - // The `ignoreMissing` flag has no effect if there is an IO error + // The `ignoreMissing` and `kraftMode` flag has no effect if there is an IO error testEmptyGetBrokerMetadataAndOfflineDirs(invalidDir, - expectedOfflineDirs = Seq(invalidDir), ignoreMissing = true) + expectedOfflineDirs = Seq(invalidDir), ignoreMissing = true, kraftMode = true) testEmptyGetBrokerMetadataAndOfflineDirs(invalidDir, - expectedOfflineDirs = Seq(invalidDir), ignoreMissing = false) + expectedOfflineDirs = Seq(invalidDir), ignoreMissing = false, kraftMode = true) } finally { Utils.delete(invalidDir) } @@ -132,11 +204,16 @@ class BrokerMetadataCheckpointTest { val tempDir = TestUtils.tempDirectory() try { testEmptyGetBrokerMetadataAndOfflineDirs(tempDir, - expectedOfflineDirs = Seq(), ignoreMissing = true) + expectedOfflineDirs = Seq(), ignoreMissing = true, kraftMode = true) + testEmptyGetBrokerMetadataAndOfflineDirs(tempDir, + expectedOfflineDirs = Seq(), ignoreMissing = true, kraftMode = false) assertThrows(classOf[RuntimeException], () => BrokerMetadataCheckpoint.getBrokerMetadataAndOfflineDirs( - Seq(tempDir.getAbsolutePath), false)) + Seq(tempDir.getAbsolutePath), ignoreMissing = false, kraftMode = false)) + assertThrows(classOf[RuntimeException], + () => BrokerMetadataCheckpoint.getBrokerMetadataAndOfflineDirs( + Seq(tempDir.getAbsolutePath), ignoreMissing = false, kraftMode = true)) } finally { Utils.delete(tempDir) } @@ -145,10 +222,11 @@ class BrokerMetadataCheckpointTest { private def testEmptyGetBrokerMetadataAndOfflineDirs( logDir: File, expectedOfflineDirs: Seq[File], - ignoreMissing: Boolean + ignoreMissing: Boolean, + kraftMode: Boolean ): Unit = { val (metaProperties, offlineDirs) = BrokerMetadataCheckpoint.getBrokerMetadataAndOfflineDirs( - Seq(logDir.getAbsolutePath), ignoreMissing) + Seq(logDir.getAbsolutePath), ignoreMissing, kraftMode = false) assertEquals(expectedOfflineDirs.map(_.getAbsolutePath), offlineDirs) assertEquals(new Properties(), metaProperties.props) }