Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,8 @@ case class MetaProperties(
object BrokerMetadataCheckpoint extends Logging {
def getBrokerMetadataAndOfflineDirs(
logDirs: collection.Seq[String],
ignoreMissing: Boolean
ignoreMissing: Boolean,
kraftMode: Boolean
): (RawMetaProperties, collection.Seq[String]) = {
require(logDirs.nonEmpty, "Must have at least one log dir to read meta.properties")

Expand Down Expand Up @@ -188,8 +189,13 @@ object BrokerMetadataCheckpoint extends Logging {
if (brokerMetadataMap.isEmpty) {
(new RawMetaProperties(), offlineDirs)
} else {
val parsedProperties = brokerMetadataMap.values.map(props => MetaProperties.parse(new RawMetaProperties(props)))
val numDistinctMetaProperties = parsedProperties.toSet.size
// KRaft mode has to support handling both meta.properties versions 0 and 1 and has to
// reconcile have multiple versions in different directories.
val numDistinctMetaProperties = if (kraftMode) {
brokerMetadataMap.values.map(props => MetaProperties.parse(new RawMetaProperties(props))).toSet.size
} else {
brokerMetadataMap.values.toSet.size
}
if (numDistinctMetaProperties > 1) {
val builder = new StringBuilder

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/KafkaRaftServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ object KafkaRaftServer {
def initializeLogDirs(config: KafkaConfig): (MetaProperties, BootstrapMetadata, Seq[String]) = {
val logDirs = (config.logDirs.toSet + config.metadataLogDir).toSeq
val (rawMetaProperties, offlineDirs) = BrokerMetadataCheckpoint.
getBrokerMetadataAndOfflineDirs(logDirs, ignoreMissing = false)
getBrokerMetadataAndOfflineDirs(logDirs, ignoreMissing = false, kraftMode = true)

if (offlineDirs.contains(config.metadataLogDir)) {
throw new KafkaException("Cannot start server since `meta.properties` could not be " +
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/KafkaServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ class KafkaServer(

/* load metadata */
val (preloadedBrokerMetadataCheckpoint, initialOfflineDirs) =
BrokerMetadataCheckpoint.getBrokerMetadataAndOfflineDirs(config.logDirs, ignoreMissing = true)
BrokerMetadataCheckpoint.getBrokerMetadataAndOfflineDirs(config.logDirs, ignoreMissing = true, kraftMode = false)

if (preloadedBrokerMetadataCheckpoint.version != 0) {
throw new RuntimeException(s"Found unexpected version in loaded `meta.properties`: " +
Expand Down
98 changes: 88 additions & 10 deletions core/src/test/scala/kafka/server/BrokerMetadataCheckpointTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,18 @@
*/
package kafka.server

import java.io.File
import java.util.Properties
import kafka.utils.{CoreUtils, Logging}

import java.io.{File, FileOutputStream}
import java.util.Properties
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.test.TestUtils
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test

class BrokerMetadataCheckpointTest {
import scala.collection.mutable

class BrokerMetadataCheckpointTest extends Logging {
private val clusterIdBase64 = "H3KKO4NTRPaCWtEmm3vW7A"

@Test
Expand Down Expand Up @@ -112,16 +115,85 @@ class BrokerMetadataCheckpointTest {
assertThrows(classOf[RuntimeException], () => MetaProperties.parse(properties))
}

@Test
def testMetaPropertiesWithVersionZero(): Unit = {
val properties = new RawMetaProperties()
properties.version = 0
properties.clusterId = clusterIdBase64
properties.brokerId = 5
val metaProps = MetaProperties.parse(properties)
assertEquals(clusterIdBase64, metaProps.clusterId)
assertEquals(5, metaProps.nodeId)
}

@Test
def testValidMetaPropertiesWithMultipleVersionsInLogDirs(): Unit = {
// Let's create two directories with meta.properties one in v0 and v1.
val props1 = new RawMetaProperties()
props1.version = 0
props1.clusterId = clusterIdBase64
props1.brokerId = 5
val props2 = new RawMetaProperties()
props2.version = 1
props2.clusterId = clusterIdBase64
props2.nodeId = 5
for (ignoreMissing <- Seq(true, false)) {
val (metaProps, offlineDirs) = getMetadataWithMultipleMetaPropLogDirs(Seq(props1, props2), ignoreMissing, kraftMode = true)
assertEquals(MetaProperties.parse(props2), MetaProperties.parse(metaProps))
assertEquals(Seq.empty, offlineDirs)
}
}

@Test
def testInvalidMetaPropertiesWithMultipleVersionsInLogDirs(): Unit = {
// Let's create two directories with meta.properties one in v0 and v1.
val props1 = new RawMetaProperties()
props1.version = 0
props1.brokerId = 5
val props2 = new RawMetaProperties()
props2.version = 1
props2.clusterId = clusterIdBase64
props2.nodeId = 5
for (ignoreMissing <- Seq(true, false)) {
assertThrows(classOf[RuntimeException],
() => getMetadataWithMultipleMetaPropLogDirs(Seq(props1, props2), ignoreMissing, kraftMode = true))
}
}

private def getMetadataWithMultipleMetaPropLogDirs(metaProperties: Seq[RawMetaProperties],
ignoreMissing: Boolean,
kraftMode: Boolean): (RawMetaProperties, collection.Seq[String]) = {
val logDirs = mutable.Buffer[File]()
try {
for (mp <- metaProperties) {
val logDir = TestUtils.tempDirectory()
logDirs += logDir
val propFile = new File(logDir.getAbsolutePath, "meta.properties")
val fs = new FileOutputStream(propFile)
try {
mp.props.store(fs, "")
fs.flush()
fs.getFD.sync()
} finally {
Utils.closeQuietly(fs, propFile.getName)
}
}
BrokerMetadataCheckpoint.getBrokerMetadataAndOfflineDirs(logDirs.map(_.getAbsolutePath), ignoreMissing, kraftMode)
} finally {
logDirs.foreach(logDir => CoreUtils.swallow(Utils.delete(logDir), this))
}
}

@Test
def testGetBrokerMetadataAndOfflineDirsWithNonexistentDirectories(): Unit = {
// Use a regular file as an invalid log dir to trigger an IO error
val invalidDir = TestUtils.tempFile("blah")
try {
// The `ignoreMissing` flag has no effect if there is an IO error
// The `ignoreMissing` and `kraftMode` flag has no effect if there is an IO error
testEmptyGetBrokerMetadataAndOfflineDirs(invalidDir,
expectedOfflineDirs = Seq(invalidDir), ignoreMissing = true)
expectedOfflineDirs = Seq(invalidDir), ignoreMissing = true, kraftMode = true)
testEmptyGetBrokerMetadataAndOfflineDirs(invalidDir,
expectedOfflineDirs = Seq(invalidDir), ignoreMissing = false)
expectedOfflineDirs = Seq(invalidDir), ignoreMissing = false, kraftMode = true)
} finally {
Utils.delete(invalidDir)
}
Expand All @@ -132,11 +204,16 @@ class BrokerMetadataCheckpointTest {
val tempDir = TestUtils.tempDirectory()
try {
testEmptyGetBrokerMetadataAndOfflineDirs(tempDir,
expectedOfflineDirs = Seq(), ignoreMissing = true)
expectedOfflineDirs = Seq(), ignoreMissing = true, kraftMode = true)
testEmptyGetBrokerMetadataAndOfflineDirs(tempDir,
expectedOfflineDirs = Seq(), ignoreMissing = true, kraftMode = false)

assertThrows(classOf[RuntimeException],
() => BrokerMetadataCheckpoint.getBrokerMetadataAndOfflineDirs(
Seq(tempDir.getAbsolutePath), false))
Seq(tempDir.getAbsolutePath), ignoreMissing = false, kraftMode = false))
assertThrows(classOf[RuntimeException],
() => BrokerMetadataCheckpoint.getBrokerMetadataAndOfflineDirs(
Seq(tempDir.getAbsolutePath), ignoreMissing = false, kraftMode = true))
} finally {
Utils.delete(tempDir)
}
Expand All @@ -145,10 +222,11 @@ class BrokerMetadataCheckpointTest {
private def testEmptyGetBrokerMetadataAndOfflineDirs(
logDir: File,
expectedOfflineDirs: Seq[File],
ignoreMissing: Boolean
ignoreMissing: Boolean,
kraftMode: Boolean
): Unit = {
val (metaProperties, offlineDirs) = BrokerMetadataCheckpoint.getBrokerMetadataAndOfflineDirs(
Seq(logDir.getAbsolutePath), ignoreMissing)
Seq(logDir.getAbsolutePath), ignoreMissing, kraftMode = false)
assertEquals(expectedOfflineDirs.map(_.getAbsolutePath), offlineDirs)
assertEquals(new Properties(), metaProperties.props)
}
Expand Down