From 4e99467391e938208247e86b05a6c0b11e07a1ac Mon Sep 17 00:00:00 2001 From: Proven Provenzano Date: Wed, 18 Oct 2023 19:26:40 -0400 Subject: [PATCH 1/5] Fix the overwrite to write back the updated meta.properties file with directory.id. Fix reading of the meta.properties file to ignore the directory.id when validating that all the meta.properties files belong to the same cluster. --- .../main/scala/kafka/server/BrokerMetadataCheckpoint.scala | 3 +++ core/src/main/scala/kafka/server/KafkaServer.scala | 5 ++++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala b/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala index 3d00ebdf3f7c9..37d1ce2c2a358 100755 --- a/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala +++ b/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala @@ -166,6 +166,7 @@ case class MetaProperties( } } +// Return only the RawMetaProperties that are the same across all directoris. object BrokerMetadataCheckpoint extends Logging { def getBrokerMetadataAndOfflineDirs( logDirs: collection.Seq[String], @@ -184,6 +185,8 @@ object BrokerMetadataCheckpoint extends Logging { try { brokerCheckpoint.read() match { case Some(properties) => + // XXX Should we check for duplicates here + properties.remove(DirectoryIdKey) brokerMetadataMap += logDir -> properties case None => if (!ignoreMissing) { diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index a5a05c977e5d4..350d7b70209a1 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -63,6 +63,7 @@ import java.io.{File, IOException} import java.net.{InetAddress, SocketTimeoutException} import java.util.concurrent._ import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} +import java.util.Properties import scala.collection.{Map, Seq} import scala.compat.java8.OptionConverters.RichOptionForJava8 import scala.jdk.CollectionConverters._ @@ -1025,9 +1026,11 @@ class KafkaServer( */ private def checkpointBrokerMetadata(brokerMetadata: ZkMetaProperties) = { for (logDir <- config.logDirs if logManager.isLogDirOnline(new File(logDir).getAbsolutePath)) { + val props = brokerMetadata.toProperties.clone().asInstanceOf[Properties] + props.setProperty("directory.id", logManager.directoryId(logDir).get.toString) val checkpoint = brokerMetadataCheckpoints(logDir) try { - checkpoint.write(brokerMetadata.toProperties) + checkpoint.write(props) } catch { case e: IOException => val dirPath = checkpoint.file.getAbsolutePath From ed86a4a535a333bd4e5ab97233c4064f07897771 Mon Sep 17 00:00:00 2001 From: Proven Provenzano Date: Wed, 18 Oct 2023 21:26:28 -0400 Subject: [PATCH 2/5] In Zk mode we allow for directoris to have no meta.properties file. We create the file and in this case we MUST create a random Uuid for it. --- core/src/main/scala/kafka/server/KafkaServer.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 350d7b70209a1..76debc32a9b3d 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -45,7 +45,7 @@ import org.apache.kafka.common.security.scram.internals.ScramMechanism import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache import org.apache.kafka.common.security.{JaasContext, JaasUtils} import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time, Utils} -import org.apache.kafka.common.{Endpoint, KafkaException, Node, TopicPartition} +import org.apache.kafka.common.{Endpoint, KafkaException, Node, TopicPartition, Uuid} import org.apache.kafka.coordinator.group.GroupCoordinator import org.apache.kafka.metadata.{BrokerState, MetadataRecordSerde, VersionRange} import org.apache.kafka.raft.RaftConfig @@ -1027,7 +1027,9 @@ class KafkaServer( private def checkpointBrokerMetadata(brokerMetadata: ZkMetaProperties) = { for (logDir <- config.logDirs if logManager.isLogDirOnline(new File(logDir).getAbsolutePath)) { val props = brokerMetadata.toProperties.clone().asInstanceOf[Properties] - props.setProperty("directory.id", logManager.directoryId(logDir).get.toString) + // If the Uuid is not set then we set it here because originally we ignored + // directories with no meta.properties file and we are creating it here. + props.setProperty("directory.id", logManager.directoryId(logDir).getOrElse(Uuid.randomUuid()).toString) val checkpoint = brokerMetadataCheckpoints(logDir) try { checkpoint.write(props) From 26967e359c273ca2c0560aec8bb34777fc593f08 Mon Sep 17 00:00:00 2001 From: Proven Provenzano Date: Mon, 23 Oct 2023 18:33:47 -0400 Subject: [PATCH 3/5] Address a couple review comments --- .../src/main/scala/kafka/log/LogManager.scala | 33 +++++++++++-------- .../server/BrokerMetadataCheckpoint.scala | 2 +- .../main/scala/kafka/server/KafkaServer.scala | 6 ++-- 3 files changed, 23 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index e420b0df435da..ed982032639b3 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -270,26 +270,33 @@ class LogManager(logDirs: Seq[File], def directoryId(dir: String): Option[Uuid] = dirIds.get(dir) /** - * Determine directory ID for each directory with a meta.properties. + * Determine directory ID for each directory. * If meta.properties does not include a directory ID, one is generated and persisted back to meta.properties. - * Directories without a meta.properties don't get a directory ID assigned. + * Directories without a meta.properties file, which can only occur in Zk mode, will file have a directory ID assigned. + * The ID will be written when the new meta.properties file is first written. */ private def directoryIds(dirs: Seq[File]): Map[String, Uuid] = { dirs.flatMap { dir => try { val metadataCheckpoint = new BrokerMetadataCheckpoint(new File(dir, KafkaServer.brokerMetaPropsFile)) - metadataCheckpoint.read().map { props => - val rawMetaProperties = new RawMetaProperties(props) - val uuid = rawMetaProperties.directoryId match { - case Some(uuidStr) => Uuid.fromString(uuidStr) - case None => - val uuid = Uuid.randomUuid() - rawMetaProperties.directoryId = uuid.toString - metadataCheckpoint.write(rawMetaProperties.props) - uuid + val uuid = metadataCheckpoint.read() match { + case Some(props) => { + val rawMetaProperties = new RawMetaProperties(props) + val uuid_from_properties = rawMetaProperties.directoryId match { + case Some(uuidStr) => Uuid.fromString(uuidStr) + case None => + val uuid_new = Uuid.randomUuid() + rawMetaProperties.directoryId = uuid_new.toString + metadataCheckpoint.write(rawMetaProperties.props) + uuid_new + } + uuid_from_properties + } + case None => { + Uuid.randomUuid() } - dir.getAbsolutePath -> uuid - }.toMap + } + Seq(dir.getAbsolutePath -> uuid) } catch { case e: IOException => logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath, s"Disk error while loading ID $dir", e) diff --git a/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala b/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala index 37d1ce2c2a358..21705907347f2 100755 --- a/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala +++ b/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala @@ -166,7 +166,7 @@ case class MetaProperties( } } -// Return only the RawMetaProperties that are the same across all directoris. +// Return only the RawMetaProperties that are the same across all directories. object BrokerMetadataCheckpoint extends Logging { def getBrokerMetadataAndOfflineDirs( logDirs: collection.Seq[String], diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 76debc32a9b3d..350d7b70209a1 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -45,7 +45,7 @@ import org.apache.kafka.common.security.scram.internals.ScramMechanism import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache import org.apache.kafka.common.security.{JaasContext, JaasUtils} import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time, Utils} -import org.apache.kafka.common.{Endpoint, KafkaException, Node, TopicPartition, Uuid} +import org.apache.kafka.common.{Endpoint, KafkaException, Node, TopicPartition} import org.apache.kafka.coordinator.group.GroupCoordinator import org.apache.kafka.metadata.{BrokerState, MetadataRecordSerde, VersionRange} import org.apache.kafka.raft.RaftConfig @@ -1027,9 +1027,7 @@ class KafkaServer( private def checkpointBrokerMetadata(brokerMetadata: ZkMetaProperties) = { for (logDir <- config.logDirs if logManager.isLogDirOnline(new File(logDir).getAbsolutePath)) { val props = brokerMetadata.toProperties.clone().asInstanceOf[Properties] - // If the Uuid is not set then we set it here because originally we ignored - // directories with no meta.properties file and we are creating it here. - props.setProperty("directory.id", logManager.directoryId(logDir).getOrElse(Uuid.randomUuid()).toString) + props.setProperty("directory.id", logManager.directoryId(logDir).get.toString) val checkpoint = brokerMetadataCheckpoints(logDir) try { checkpoint.write(props) From c7f7104b88d84b8f684a6761e3873c376e5c0471 Mon Sep 17 00:00:00 2001 From: Proven Provenzano Date: Tue, 24 Oct 2023 02:22:50 -0400 Subject: [PATCH 4/5] Check for duplicate directory.ids and fail broker start if there are any --- core/src/main/scala/kafka/log/LogManager.scala | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index ed982032639b3..805a8f7e037de 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -276,6 +276,7 @@ class LogManager(logDirs: Seq[File], * The ID will be written when the new meta.properties file is first written. */ private def directoryIds(dirs: Seq[File]): Map[String, Uuid] = { + val s = scala.collection.mutable.Set[Uuid]() dirs.flatMap { dir => try { val metadataCheckpoint = new BrokerMetadataCheckpoint(new File(dir, KafkaServer.brokerMetaPropsFile)) @@ -296,6 +297,10 @@ class LogManager(logDirs: Seq[File], Uuid.randomUuid() } } + if (s.contains(uuid)) { + throw new RuntimeException(s"Found duplicate directory.ids ${uuid.toString}") + } + s += uuid Seq(dir.getAbsolutePath -> uuid) } catch { case e: IOException => From 7832a721b9901b58eac02e59a6165c100d095ccf Mon Sep 17 00:00:00 2001 From: Proven Provenzano Date: Tue, 24 Oct 2023 11:53:11 -0400 Subject: [PATCH 5/5] Fix testLoadDirectoryIds and add testLoadDirectoryIdsWithDuplicates --- .../scala/unit/kafka/log/LogManagerTest.scala | 29 +++++++++++++++++-- 1 file changed, 27 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index 7e35857c5f731..7a00640b3d1fd 100755 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -1031,9 +1031,34 @@ class LogManagerTest { assertTrue(logManager.directoryId(dirs(0).getAbsolutePath).isDefined) assertEquals(Some(Uuid.fromString("ZwkGXjB0TvSF6mjVh6gO7Q")), logManager.directoryId(dirs(1).getAbsolutePath)) - assertEquals(None, logManager.directoryId(dirs(2).getAbsolutePath)) + // We allocate a directoryId even if there isn't a meta.properties. + assertTrue(logManager.directoryId(dirs(2).getAbsolutePath).isDefined) assertEquals(Some(Uuid.fromString("kQfNPJ2FTHq_6Qlyyv6Jqg")), logManager.directoryId(dirs(3).getAbsolutePath)) assertTrue(logManager.directoryId(dirs(4).getAbsolutePath).isDefined) - assertEquals(4, logManager.directoryIds.size) + assertEquals(5, logManager.directoryIds.size) + } + + @Test + def testLoadDirectoryIdsWithDuplicates(): Unit = { + def writeMetaProperties(dir: File, id: Option[String] = None): Unit = { + val rawProps = new RawMetaProperties() + rawProps.nodeId = 1 + rawProps.clusterId = "IVT1Seu3QjacxS7oBTKhDQ" + id.foreach(v => rawProps.directoryId = v) + new BrokerMetadataCheckpoint(new File(dir, KafkaServer.brokerMetaPropsFile)).write(rawProps.props) + } + val dirs: Seq[File] = Seq.fill(5)(TestUtils.tempDir()) + writeMetaProperties(dirs(0)) + writeMetaProperties(dirs(1), Some("ZwkGXjB0TvSF6mjVh6gO7Q")) + // no meta.properties on dirs(2) + writeMetaProperties(dirs(3), Some("kQfNPJ2FTHq_6Qlyyv6Jqg")) + writeMetaProperties(dirs(4), Some("ZwkGXjB0TvSF6mjVh6gO7Q")) + + assertTrue(try { + logManager = createLogManager(dirs) + false + } catch { + case e: RuntimeException => true + }) } }