Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 25 additions & 13 deletions core/src/main/scala/kafka/log/LogManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -270,26 +270,38 @@ class LogManager(logDirs: Seq[File],
def directoryId(dir: String): Option[Uuid] = dirIds.get(dir)

/**
* Determine directory ID for each directory with a meta.properties.
* Determine directory ID for each directory.
* If meta.properties does not include a directory ID, one is generated and persisted back to meta.properties.
* Directories without a meta.properties don't get a directory ID assigned.
* Directories without a meta.properties file, which can only occur in Zk mode, will file have a directory ID assigned.
* The ID will be written when the new meta.properties file is first written.
*/
private def directoryIds(dirs: Seq[File]): Map[String, Uuid] = {
val s = scala.collection.mutable.Set[Uuid]()
dirs.flatMap { dir =>
try {
val metadataCheckpoint = new BrokerMetadataCheckpoint(new File(dir, KafkaServer.brokerMetaPropsFile))
metadataCheckpoint.read().map { props =>
val rawMetaProperties = new RawMetaProperties(props)
val uuid = rawMetaProperties.directoryId match {
case Some(uuidStr) => Uuid.fromString(uuidStr)
case None =>
val uuid = Uuid.randomUuid()
rawMetaProperties.directoryId = uuid.toString
metadataCheckpoint.write(rawMetaProperties.props)
uuid
val uuid = metadataCheckpoint.read() match {
case Some(props) => {
val rawMetaProperties = new RawMetaProperties(props)
val uuid_from_properties = rawMetaProperties.directoryId match {
case Some(uuidStr) => Uuid.fromString(uuidStr)
case None =>
val uuid_new = Uuid.randomUuid()
rawMetaProperties.directoryId = uuid_new.toString
metadataCheckpoint.write(rawMetaProperties.props)
uuid_new
}
uuid_from_properties
}
case None => {
Uuid.randomUuid()
Comment on lines +296 to +297
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about this, maybe I'm missing something. So if meta.properties does not exist for some directory, we generate a random one, but it seems the generated value isn't persisted? That means the same log directory will get a different ID ever time the broker restarts, which will generate a lot of unnecessary reassignments...

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will get persisted. The only case where we allow a directory to not have a meta.properties file is in Zk mode. In Zk mode on startup, all directories without a meta.properties file have one created immediately after the broker is registered with ZooKeeper.

}
dir.getAbsolutePath -> uuid
}.toMap
}
if (s.contains(uuid)) {
throw new RuntimeException(s"Found duplicate directory.ids ${uuid.toString}")
}
Comment on lines +300 to +302
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An alternative to maintaining a mutable set and modifying it from within the flatMap lamda could be to compare the size of the result with .toSet.size or .toSeq.distinct.size e.g.

if (dirIds.values.toSet.size != dirIds.values.size)
  throw new RuntimeException(s"Found duplicate directory.ids ${uuid.toString}")

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps KafkaException is more suitable than RuntimeException?

s += uuid
Seq(dir.getAbsolutePath -> uuid)
} catch {
case e: IOException =>
logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath, s"Disk error while loading ID $dir", e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ case class MetaProperties(
}
}

// Return only the RawMetaProperties that are the same across all directories.
object BrokerMetadataCheckpoint extends Logging {
def getBrokerMetadataAndOfflineDirs(
logDirs: collection.Seq[String],
Expand All @@ -184,6 +185,8 @@ object BrokerMetadataCheckpoint extends Logging {
try {
brokerCheckpoint.read() match {
case Some(properties) =>
// XXX Should we check for duplicates here
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is LogManager.directoryIds() a better place to check for duplicates?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added it there. Let me know what you think

properties.remove(DirectoryIdKey)
brokerMetadataMap += logDir -> properties
case None =>
if (!ignoreMissing) {
Expand Down
5 changes: 4 additions & 1 deletion core/src/main/scala/kafka/server/KafkaServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ import java.io.{File, IOException}
import java.net.{InetAddress, SocketTimeoutException}
import java.util.concurrent._
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import java.util.Properties
import scala.collection.{Map, Seq}
import scala.compat.java8.OptionConverters.RichOptionForJava8
import scala.jdk.CollectionConverters._
Expand Down Expand Up @@ -1025,9 +1026,11 @@ class KafkaServer(
*/
private def checkpointBrokerMetadata(brokerMetadata: ZkMetaProperties) = {
for (logDir <- config.logDirs if logManager.isLogDirOnline(new File(logDir).getAbsolutePath)) {
val props = brokerMetadata.toProperties.clone().asInstanceOf[Properties]
props.setProperty("directory.id", logManager.directoryId(logDir).get.toString)
val checkpoint = brokerMetadataCheckpoints(logDir)
try {
checkpoint.write(brokerMetadata.toProperties)
checkpoint.write(props)
} catch {
case e: IOException =>
val dirPath = checkpoint.file.getAbsolutePath
Expand Down
29 changes: 27 additions & 2 deletions core/src/test/scala/unit/kafka/log/LogManagerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1031,9 +1031,34 @@ class LogManagerTest {

assertTrue(logManager.directoryId(dirs(0).getAbsolutePath).isDefined)
assertEquals(Some(Uuid.fromString("ZwkGXjB0TvSF6mjVh6gO7Q")), logManager.directoryId(dirs(1).getAbsolutePath))
assertEquals(None, logManager.directoryId(dirs(2).getAbsolutePath))
// We allocate a directoryId even if there isn't a meta.properties.
assertTrue(logManager.directoryId(dirs(2).getAbsolutePath).isDefined)
assertEquals(Some(Uuid.fromString("kQfNPJ2FTHq_6Qlyyv6Jqg")), logManager.directoryId(dirs(3).getAbsolutePath))
assertTrue(logManager.directoryId(dirs(4).getAbsolutePath).isDefined)
assertEquals(4, logManager.directoryIds.size)
assertEquals(5, logManager.directoryIds.size)
}

@Test
def testLoadDirectoryIdsWithDuplicates(): Unit = {
def writeMetaProperties(dir: File, id: Option[String] = None): Unit = {
val rawProps = new RawMetaProperties()
rawProps.nodeId = 1
rawProps.clusterId = "IVT1Seu3QjacxS7oBTKhDQ"
id.foreach(v => rawProps.directoryId = v)
new BrokerMetadataCheckpoint(new File(dir, KafkaServer.brokerMetaPropsFile)).write(rawProps.props)
}
val dirs: Seq[File] = Seq.fill(5)(TestUtils.tempDir())
writeMetaProperties(dirs(0))
writeMetaProperties(dirs(1), Some("ZwkGXjB0TvSF6mjVh6gO7Q"))
// no meta.properties on dirs(2)
writeMetaProperties(dirs(3), Some("kQfNPJ2FTHq_6Qlyyv6Jqg"))
writeMetaProperties(dirs(4), Some("ZwkGXjB0TvSF6mjVh6gO7Q"))

assertTrue(try {
logManager = createLogManager(dirs)
false
} catch {
case e: RuntimeException => true
})
}
}