diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index e420b0df435da..805a8f7e037de 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -270,26 +270,38 @@ class LogManager(logDirs: Seq[File], def directoryId(dir: String): Option[Uuid] = dirIds.get(dir) /** - * Determine directory ID for each directory with a meta.properties. + * Determine directory ID for each directory. * If meta.properties does not include a directory ID, one is generated and persisted back to meta.properties. - * Directories without a meta.properties don't get a directory ID assigned. + * Directories without a meta.properties file, which can only occur in Zk mode, will file have a directory ID assigned. + * The ID will be written when the new meta.properties file is first written. */ private def directoryIds(dirs: Seq[File]): Map[String, Uuid] = { + val s = scala.collection.mutable.Set[Uuid]() dirs.flatMap { dir => try { val metadataCheckpoint = new BrokerMetadataCheckpoint(new File(dir, KafkaServer.brokerMetaPropsFile)) - metadataCheckpoint.read().map { props => - val rawMetaProperties = new RawMetaProperties(props) - val uuid = rawMetaProperties.directoryId match { - case Some(uuidStr) => Uuid.fromString(uuidStr) - case None => - val uuid = Uuid.randomUuid() - rawMetaProperties.directoryId = uuid.toString - metadataCheckpoint.write(rawMetaProperties.props) - uuid + val uuid = metadataCheckpoint.read() match { + case Some(props) => { + val rawMetaProperties = new RawMetaProperties(props) + val uuid_from_properties = rawMetaProperties.directoryId match { + case Some(uuidStr) => Uuid.fromString(uuidStr) + case None => + val uuid_new = Uuid.randomUuid() + rawMetaProperties.directoryId = uuid_new.toString + metadataCheckpoint.write(rawMetaProperties.props) + uuid_new + } + uuid_from_properties + } + case None => { + Uuid.randomUuid() } - dir.getAbsolutePath -> uuid - }.toMap + } + if (s.contains(uuid)) { + throw new RuntimeException(s"Found duplicate directory.ids ${uuid.toString}") + } + s += uuid + Seq(dir.getAbsolutePath -> uuid) } catch { case e: IOException => logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath, s"Disk error while loading ID $dir", e) diff --git a/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala b/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala index 3d00ebdf3f7c9..21705907347f2 100755 --- a/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala +++ b/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala @@ -166,6 +166,7 @@ case class MetaProperties( } } +// Return only the RawMetaProperties that are the same across all directories. object BrokerMetadataCheckpoint extends Logging { def getBrokerMetadataAndOfflineDirs( logDirs: collection.Seq[String], @@ -184,6 +185,8 @@ object BrokerMetadataCheckpoint extends Logging { try { brokerCheckpoint.read() match { case Some(properties) => + // XXX Should we check for duplicates here + properties.remove(DirectoryIdKey) brokerMetadataMap += logDir -> properties case None => if (!ignoreMissing) { diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index a5a05c977e5d4..350d7b70209a1 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -63,6 +63,7 @@ import java.io.{File, IOException} import java.net.{InetAddress, SocketTimeoutException} import java.util.concurrent._ import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} +import java.util.Properties import scala.collection.{Map, Seq} import scala.compat.java8.OptionConverters.RichOptionForJava8 import scala.jdk.CollectionConverters._ @@ -1025,9 +1026,11 @@ class KafkaServer( */ private def checkpointBrokerMetadata(brokerMetadata: ZkMetaProperties) = { for (logDir <- config.logDirs if logManager.isLogDirOnline(new File(logDir).getAbsolutePath)) { + val props = brokerMetadata.toProperties.clone().asInstanceOf[Properties] + props.setProperty("directory.id", logManager.directoryId(logDir).get.toString) val checkpoint = brokerMetadataCheckpoints(logDir) try { - checkpoint.write(brokerMetadata.toProperties) + checkpoint.write(props) } catch { case e: IOException => val dirPath = checkpoint.file.getAbsolutePath diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index 7e35857c5f731..7a00640b3d1fd 100755 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -1031,9 +1031,34 @@ class LogManagerTest { assertTrue(logManager.directoryId(dirs(0).getAbsolutePath).isDefined) assertEquals(Some(Uuid.fromString("ZwkGXjB0TvSF6mjVh6gO7Q")), logManager.directoryId(dirs(1).getAbsolutePath)) - assertEquals(None, logManager.directoryId(dirs(2).getAbsolutePath)) + // We allocate a directoryId even if there isn't a meta.properties. + assertTrue(logManager.directoryId(dirs(2).getAbsolutePath).isDefined) assertEquals(Some(Uuid.fromString("kQfNPJ2FTHq_6Qlyyv6Jqg")), logManager.directoryId(dirs(3).getAbsolutePath)) assertTrue(logManager.directoryId(dirs(4).getAbsolutePath).isDefined) - assertEquals(4, logManager.directoryIds.size) + assertEquals(5, logManager.directoryIds.size) + } + + @Test + def testLoadDirectoryIdsWithDuplicates(): Unit = { + def writeMetaProperties(dir: File, id: Option[String] = None): Unit = { + val rawProps = new RawMetaProperties() + rawProps.nodeId = 1 + rawProps.clusterId = "IVT1Seu3QjacxS7oBTKhDQ" + id.foreach(v => rawProps.directoryId = v) + new BrokerMetadataCheckpoint(new File(dir, KafkaServer.brokerMetaPropsFile)).write(rawProps.props) + } + val dirs: Seq[File] = Seq.fill(5)(TestUtils.tempDir()) + writeMetaProperties(dirs(0)) + writeMetaProperties(dirs(1), Some("ZwkGXjB0TvSF6mjVh6gO7Q")) + // no meta.properties on dirs(2) + writeMetaProperties(dirs(3), Some("kQfNPJ2FTHq_6Qlyyv6Jqg")) + writeMetaProperties(dirs(4), Some("ZwkGXjB0TvSF6mjVh6gO7Q")) + + assertTrue(try { + logManager = createLogManager(dirs) + false + } catch { + case e: RuntimeException => true + }) } }