From 66f0c830a46ce5338eb179000cc20da1068ddd52 Mon Sep 17 00:00:00 2001 From: Justine Olshan Date: Thu, 15 Jul 2021 14:00:32 -0700 Subject: [PATCH 1/3] Fix perf regression on LISR requests by asynchronously flushing the partition.metadata file (#11056) After noticing increased LISR times, we discovered a lot of time was spent synchronously flushing the partition metadata file. This PR changes the code so we asynchronously flush the files. We ensure files are flushed before appending, renaming or closing the log to ensure we have the partition metadata information on disk. Three new tests have been added to address these cases. Reviewers: Lucas Bradstreet , Jun Rao --- .../org/apache/kafka/common/utils/Utils.java | 14 ++ .../main/scala/kafka/cluster/Partition.scala | 3 +- core/src/main/scala/kafka/log/Log.scala | 21 ++ .../kafka/server/PartitionMetadataFile.scala | 61 +++-- .../test/scala/unit/kafka/log/LogTest.scala | 55 +++- .../jmh/server/PartitionCreationBench.java | 237 ++++++++++++++++++ 6 files changed, 367 insertions(+), 24 deletions(-) create mode 100644 jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index da89181f7bea1..595314ccb7705 100755 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -47,6 +47,7 @@ import java.nio.file.Paths; import java.nio.file.SimpleFileVisitor; import java.nio.file.StandardCopyOption; +import java.nio.file.StandardOpenOption; import java.nio.file.attribute.BasicFileAttributes; import java.text.DecimalFormat; import java.text.DecimalFormatSymbols; @@ -911,6 +912,19 @@ public static void atomicMoveWithFallback(Path source, Path target) throws IOExc } } + /** + * Flushes dirty directories to guarantee crash consistency. + * + * @throws IOException if flushing the directory fails. + */ + public static void flushDir(Path path) throws IOException { + if (path != null) { + try (FileChannel dir = FileChannel.open(path, StandardOpenOption.READ)) { + dir.force(true); + } + } + } + /** * Closes all the provided closeables. * @throws IOException if any of the close methods throws an IOException. diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 9b35a00dc1497..a58f4238ffc2d 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -449,8 +449,7 @@ class Partition(val topicPartition: TopicPartition, // This is because if the broker previously wrote it to file, it would be recovered on restart after failure. // Topic ID is consistent since we are just setting it here. if (log.topicId == Uuid.ZERO_UUID) { - log.partitionMetadataFile.write(requestTopicId) - log.topicId = requestTopicId + log.assignTopicId(requestTopicId) true } else if (log.topicId != requestTopicId) { stateChangeLogger.error(s"Topic Id in memory: ${log.topicId} does not" + diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 2729154bab4ac..48c411d4b3ec4 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -601,6 +601,21 @@ class Log(@volatile private var _dir: File, } } + private def maybeFlushMetadataFile(): Unit = { + partitionMetadataFile.maybeFlush() + } + + /** Only used for ZK clusters when we update and start using topic IDs on existing topics */ + def assignTopicId(topicId: Uuid): Unit = { + if (keepPartitionMetadataFile) { + this.topicId = topicId + if (!partitionMetadataFile.exists()) { + partitionMetadataFile.record(topicId) + scheduler.schedule("flush-metadata-file", maybeFlushMetadataFile) + } + } + } + /** * Removes any temporary files found in log directory, and creates a list of all .swap files which could be swapped * in place of existing segment(s). For log splitting, we know that any .swap file whose base offset is higher than @@ -1048,6 +1063,7 @@ class Log(@volatile private var _dir: File, def close(): Unit = { debug("Closing log") lock synchronized { + maybeFlushMetadataFile() checkIfMemoryMappedBufferClosed() producerExpireCheck.cancel(true) maybeHandleIOException(s"Error while renaming dir for $topicPartition in dir ${dir.getParent}") { @@ -1068,6 +1084,8 @@ class Log(@volatile private var _dir: File, def renameDir(name: String): Unit = { lock synchronized { maybeHandleIOException(s"Error while renaming dir for $topicPartition in log dir ${dir.getParent}") { + // Flush partitionMetadata file before initializing again + maybeFlushMetadataFile() val renamedDir = new File(dir.getParent, name) Utils.atomicMoveWithFallback(dir.toPath, renamedDir.toPath) if (renamedDir != dir) { @@ -1152,6 +1170,9 @@ class Log(@volatile private var _dir: File, validateAndAssignOffsets: Boolean, leaderEpoch: Int, ignoreRecordSize: Boolean): LogAppendInfo = { + // We want to ensure the partition metadata file is written to the log dir before any log data is written to disk. + // This will ensure that any log data can be recovered with the correct topic ID in the case of failure. + maybeFlushMetadataFile() val appendInfo = analyzeAndValidateRecords(records, origin, ignoreRecordSize, leaderEpoch) diff --git a/core/src/main/scala/kafka/server/PartitionMetadataFile.scala b/core/src/main/scala/kafka/server/PartitionMetadataFile.scala index 25b1ba6129d2b..1ccbcc1b0ab9b 100644 --- a/core/src/main/scala/kafka/server/PartitionMetadataFile.scala +++ b/core/src/main/scala/kafka/server/PartitionMetadataFile.scala @@ -24,7 +24,7 @@ import java.util.regex.Pattern import kafka.utils.Logging import org.apache.kafka.common.Uuid -import org.apache.kafka.common.errors.KafkaStorageException +import org.apache.kafka.common.errors.{InconsistentTopicIdException, KafkaStorageException} import org.apache.kafka.common.utils.Utils @@ -90,27 +90,48 @@ class PartitionMetadataFile(val file: File, private val tempPath = Paths.get(path.toString + ".tmp") private val lock = new Object() private val logDir = file.getParentFile.getParent + @volatile private var dirtyTopicIdOpt : Option[Uuid] = None + + /** + * Records the topic ID that will be flushed to disk. + */ + def record(topicId: Uuid): Unit = { + // Topic IDs should not differ, but we defensively check here to fail earlier in the case that the IDs somehow differ. + dirtyTopicIdOpt.foreach { dirtyTopicId => + if (dirtyTopicId != topicId) + throw new InconsistentTopicIdException(s"Tried to record topic ID $topicId to file " + + s"but had already recorded $dirtyTopicId") + } + dirtyTopicIdOpt = Some(topicId) + } - def write(topicId: Uuid): Unit = { - lock synchronized { - try { - // write to temp file and then swap with the existing file - val fileOutputStream = new FileOutputStream(tempPath.toFile) - val writer = new BufferedWriter(new OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8)) - try { - writer.write(PartitionMetadataFileFormatter.toFile(new PartitionMetadata(CurrentVersion,topicId))) - writer.flush() - fileOutputStream.getFD().sync() - } finally { - writer.close() - } + def maybeFlush(): Unit = { + // We check dirtyTopicId first to avoid having to take the lock unnecessarily in the frequently called log append path + dirtyTopicIdOpt.foreach { _ => + // We synchronize on the actual write to disk + lock synchronized { + dirtyTopicIdOpt.foreach { topicId => + try { + // write to temp file and then swap with the existing file + val fileOutputStream = new FileOutputStream(tempPath.toFile) + val writer = new BufferedWriter(new OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8)) + try { + writer.write(PartitionMetadataFileFormatter.toFile(new PartitionMetadata(CurrentVersion, topicId))) + writer.flush() + fileOutputStream.getFD().sync() + } finally { + writer.close() + } - Utils.atomicMoveWithFallback(tempPath, path) - } catch { - case e: IOException => - val msg = s"Error while writing to partition metadata file ${file.getAbsolutePath}" - logDirFailureChannel.maybeAddOfflineLogDir(logDir, msg, e) - throw new KafkaStorageException(msg, e) + Utils.atomicMoveWithFallback(tempPath, path) + } catch { + case e: IOException => + val msg = s"Error while writing to partition metadata file ${file.getAbsolutePath}" + logDirFailureChannel.maybeAddOfflineLogDir(logDir, msg, e) + throw new KafkaStorageException(msg, e) + } + dirtyTopicIdOpt = None + } } } } diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 66ee5d2538c20..66d7a16ddbf89 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -2533,7 +2533,7 @@ class LogTest { var log = createLog(logDir, logConfig) val topicId = Uuid.randomUuid() - log.partitionMetadataFile.write(topicId) + log.assignTopicId(topicId) log.close() // test recovery case @@ -2542,6 +2542,37 @@ class LogTest { log.close() } + def testLogFlushesPartitionMetadataOnAppend(): Unit = { + val logConfig = LogTest.createLogConfig() + val log = createLog(logDir, logConfig) + val record = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("simpleValue".getBytes)) + + val topicId = Uuid.randomUuid() + log.partitionMetadataFile.record(topicId) + + // Should trigger a synchronous flush + log.appendAsLeader(record, leaderEpoch = 0) + assertTrue(log.partitionMetadataFile.exists()) + assertEquals(topicId, log.partitionMetadataFile.read().topicId) + } + + @Test + def testLogFlushesPartitionMetadataOnClose(): Unit = { + val logConfig = LogTest.createLogConfig() + var log = createLog(logDir, logConfig) + + val topicId = Uuid.randomUuid() + log.partitionMetadataFile.record(topicId) + + // Should trigger a synchronous flush + log.close() + + // We open the log again, and the partition metadata file should exist with the same ID. + log = createLog(logDir, logConfig) + assertTrue(log.partitionMetadataFile.exists()) + assertEquals(topicId, log.partitionMetadataFile.read().topicId) + } + /** * Test building the time index on the follower by setting assignOffsets to false. */ @@ -3103,7 +3134,7 @@ class LogTest { // Write a topic ID to the partition metadata file to ensure it is transferred correctly. val id = Uuid.randomUuid() log.topicId = id - log.partitionMetadataFile.write(id) + log.assignTopicId(id) log.appendAsLeader(TestUtils.records(List(new SimpleRecord("foo".getBytes()))), leaderEpoch = 5) assertEquals(Some(5), log.latestEpoch) @@ -3121,6 +3152,26 @@ class LogTest { assertEquals(id, log.partitionMetadataFile.read().topicId) } + @Test + def testTopicIdFlushesBeforeDirectoryRename(): Unit = { + val logConfig = LogTest.createLogConfig(segmentBytes = 1000, indexIntervalBytes = 1, maxMessageBytes = 64 * 1024) + val log = createLog(logDir, logConfig) + + // Write a topic ID to the partition metadata file to ensure it is transferred correctly. + val topicId = Uuid.randomUuid() + log.partitionMetadataFile.record(topicId) + + // Ensure that after a directory rename, the partition metadata file is written to the right location. + val tp = Log.parseTopicPartitionName(log.dir) + log.renameDir(Log.logDeleteDirName(tp)) + assertTrue(PartitionMetadataFile.newFile(log.dir).exists()) + assertFalse(PartitionMetadataFile.newFile(this.logDir).exists()) + + // Check the file holds the correct contents. + assertTrue(log.partitionMetadataFile.exists()) + assertEquals(topicId, log.partitionMetadataFile.read().topicId) + } + @Test def testLeaderEpochCacheClearedAfterDowngradeInAppendedMessages(): Unit = { val logConfig = LogTest.createLogConfig(segmentBytes = 1000, indexIntervalBytes = 1, maxMessageBytes = 64 * 1024) diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java new file mode 100644 index 0000000000000..d78bad362885e --- /dev/null +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.jmh.server; + +import java.util.Properties; + +import kafka.cluster.Partition; +import kafka.log.CleanerConfig; +import kafka.log.Defaults; +import kafka.log.LogConfig; +import kafka.log.LogManager; +import kafka.server.AlterIsrManager; +import kafka.server.BrokerTopicStats; +import kafka.server.KafkaConfig; +import kafka.server.LogDirFailureChannel; +import kafka.server.MetadataCache; +import kafka.server.QuotaFactory; +import kafka.server.ReplicaManager; +import kafka.server.ZkMetadataCache; +import kafka.server.checkpoints.OffsetCheckpoints; +import kafka.server.metadata.CachedConfigRepository; +import kafka.server.metadata.ConfigRepository; +import kafka.utils.KafkaScheduler; +import kafka.utils.Scheduler; +import kafka.utils.TestUtils; +import kafka.zk.KafkaZkClient; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.LeaderAndIsrRequestData; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; + +import scala.collection.JavaConverters; +import scala.Option; + +@Warmup(iterations = 5) +@Measurement(iterations = 5) +@Fork(3) +@BenchmarkMode(Mode.AverageTime) +@State(value = Scope.Benchmark) +public class PartitionCreationBench { + @Param({"false", "true"}) + public boolean useTopicIds; + + @Param({"2000"}) + public int numPartitions; + + private final String topicName = "foo"; + + private Option topicId; + private Scheduler scheduler; + private Metrics metrics; + private Time time; + private KafkaConfig brokerProperties; + + private ReplicaManager replicaManager; + private QuotaFactory.QuotaManagers quotaManagers; + private LogDirFailureChannel failureChannel; + private LogManager logManager; + private AlterIsrManager alterIsrManager; + private List topicPartitions; + + @SuppressWarnings("deprecation") + @Setup(Level.Invocation) + public void setup() { + if (useTopicIds) + topicId = Option.apply(Uuid.randomUuid()); + else + topicId = Option.empty(); + + this.scheduler = new KafkaScheduler(1, "scheduler-thread", true); + this.brokerProperties = KafkaConfig.fromProps(TestUtils.createBrokerConfig( + 0, TestUtils.MockZkConnect(), true, true, 9092, Option.empty(), Option.empty(), + Option.empty(), true, false, 0, false, 0, false, 0, Option.empty(), 1, true, 1, + (short) 1)); + this.metrics = new Metrics(); + this.time = Time.SYSTEM; + this.failureChannel = new LogDirFailureChannel(brokerProperties.logDirs().size()); + final BrokerTopicStats brokerTopicStats = new BrokerTopicStats(); + final List files = + JavaConverters.seqAsJavaList(brokerProperties.logDirs()).stream().map(File::new).collect(Collectors.toList()); + CleanerConfig cleanerConfig = CleanerConfig.apply(1, + 4 * 1024 * 1024L, 0.9d, + 1024 * 1024, 32 * 1024 * 1024, + Double.MAX_VALUE, 15 * 1000, true, "MD5"); + + ConfigRepository configRepository = new CachedConfigRepository(); + this.logManager = new LogManager(JavaConverters.asScalaIteratorConverter(files.iterator()).asScala().toSeq(), + JavaConverters.asScalaIteratorConverter(new ArrayList().iterator()).asScala().toSeq(), + configRepository, + createLogConfig(), + cleanerConfig, + 1, + 1000L, + 10000L, + 10000L, + 1000L, + 60000, + scheduler, + brokerTopicStats, + failureChannel, + Time.SYSTEM, + true); + scheduler.startup(); + final MetadataCache metadataCache = + new ZkMetadataCache(this.brokerProperties.brokerId()); + this.quotaManagers = + QuotaFactory.instantiate(this.brokerProperties, + this.metrics, + this.time, ""); + + KafkaZkClient zkClient = new KafkaZkClient(null, false, Time.SYSTEM) { + @Override + public Properties getEntityConfigs(String rootEntityType, String sanitizedEntityName) { + return new Properties(); + } + }; + this.alterIsrManager = TestUtils.createAlterIsrManager(); + this.replicaManager = new ReplicaManager( + this.brokerProperties, + this.metrics, + this.time, + Option.apply(zkClient), + this.scheduler, + this.logManager, + new AtomicBoolean(false), + this.quotaManagers, + brokerTopicStats, + metadataCache, + this.failureChannel, + alterIsrManager, + configRepository, + Option.empty()); + replicaManager.startup(); + replicaManager.checkpointHighWatermarks(); + } + + @TearDown(Level.Invocation) + public void tearDown() throws Exception { + this.replicaManager.shutdown(false); + logManager.shutdown(); + this.metrics.close(); + this.scheduler.shutdown(); + this.quotaManagers.shutdown(); + for (File dir : JavaConverters.asJavaCollection(logManager.liveLogDirs())) { + Utils.delete(dir); + } + } + + private static LogConfig createLogConfig() { + Properties logProps = new Properties(); + logProps.put(LogConfig.SegmentMsProp(), Defaults.SegmentMs()); + logProps.put(LogConfig.SegmentBytesProp(), Defaults.SegmentSize()); + logProps.put(LogConfig.RetentionMsProp(), Defaults.RetentionMs()); + logProps.put(LogConfig.RetentionBytesProp(), Defaults.RetentionSize()); + logProps.put(LogConfig.SegmentJitterMsProp(), Defaults.SegmentJitterMs()); + logProps.put(LogConfig.CleanupPolicyProp(), Defaults.CleanupPolicy()); + logProps.put(LogConfig.MaxMessageBytesProp(), Defaults.MaxMessageSize()); + logProps.put(LogConfig.IndexIntervalBytesProp(), Defaults.IndexInterval()); + logProps.put(LogConfig.SegmentIndexBytesProp(), Defaults.MaxIndexSize()); + logProps.put(LogConfig.MessageFormatVersionProp(), Defaults.MessageFormatVersion()); + logProps.put(LogConfig.FileDeleteDelayMsProp(), Defaults.FileDeleteDelayMs()); + return LogConfig.apply(logProps, new scala.collection.immutable.HashSet<>()); + } + + @Benchmark + @Threads(1) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + public void makeFollower() { + topicPartitions = new ArrayList<>(); + for (int partitionNum = 0; partitionNum < numPartitions; partitionNum++) { + topicPartitions.add(new TopicPartition(topicName, partitionNum)); + } + + List replicas = new ArrayList<>(); + replicas.add(0); + replicas.add(1); + replicas.add(2); + + OffsetCheckpoints checkpoints = (logDir, topicPartition) -> Option.apply(0L); + for (TopicPartition topicPartition : topicPartitions) { + final Partition partition = this.replicaManager.createPartition(topicPartition); + List inSync = new ArrayList<>(); + inSync.add(0); + inSync.add(1); + inSync.add(2); + + LeaderAndIsrRequestData.LeaderAndIsrPartitionState partitionState = new LeaderAndIsrRequestData.LeaderAndIsrPartitionState() + .setControllerEpoch(0) + .setLeader(0) + .setLeaderEpoch(0) + .setIsr(inSync) + .setZkVersion(1) + .setReplicas(replicas) + .setIsNew(true); + + partition.makeFollower(partitionState, checkpoints); + topicId.foreach(partition::checkOrSetTopicId); + } + } +} From 481e3eb5d2b92ea66a726cd440626bf0c969f785 Mon Sep 17 00:00:00 2001 From: Justine Date: Wed, 18 Aug 2021 09:55:11 -0700 Subject: [PATCH 2/3] Cleanups --- .../java/org/apache/kafka/common/utils/Utils.java | 14 -------------- core/src/main/scala/kafka/log/Log.scala | 8 ++++++++ 2 files changed, 8 insertions(+), 14 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index 595314ccb7705..da89181f7bea1 100755 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -47,7 +47,6 @@ import java.nio.file.Paths; import java.nio.file.SimpleFileVisitor; import java.nio.file.StandardCopyOption; -import java.nio.file.StandardOpenOption; import java.nio.file.attribute.BasicFileAttributes; import java.text.DecimalFormat; import java.text.DecimalFormatSymbols; @@ -912,19 +911,6 @@ public static void atomicMoveWithFallback(Path source, Path target) throws IOExc } } - /** - * Flushes dirty directories to guarantee crash consistency. - * - * @throws IOException if flushing the directory fails. - */ - public static void flushDir(Path path) throws IOException { - if (path != null) { - try (FileChannel dir = FileChannel.open(path, StandardOpenOption.READ)) { - dir.force(true); - } - } - } - /** * Closes all the provided closeables. * @throws IOException if any of the close methods throws an IOException. diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 48c411d4b3ec4..aaac4ffff421f 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -607,6 +607,14 @@ class Log(@volatile private var _dir: File, /** Only used for ZK clusters when we update and start using topic IDs on existing topics */ def assignTopicId(topicId: Uuid): Unit = { + if (!this.topicId.equals(Uuid.ZERO_UUID)) { + if (!this.topicId.equals(topicId)) { + // we should never get here as the topic IDs should have been checked in becomeLeaderOrFollower + throw new InconsistentTopicIdException(s"Tried to assign topic ID $topicId to log for topic partition $topicPartition," + + s"but log already contained topic ID ${this.topicId}") + } + } + if (keepPartitionMetadataFile) { this.topicId = topicId if (!partitionMetadataFile.exists()) { From ccceed2deb264e0668e3dfdfeaa95efa9f3c5f0d Mon Sep 17 00:00:00 2001 From: Justine Date: Tue, 24 Aug 2021 15:44:19 -0700 Subject: [PATCH 3/3] Remove extra assignTopicId --- core/src/main/scala/kafka/log/Log.scala | 8 -------- 1 file changed, 8 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index bf8e549875ca9..debd25759cf7b 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -577,14 +577,6 @@ class Log(@volatile private var _dir: File, partitionMetadataFile = new PartitionMetadataFile(partitionMetadata, logDirFailureChannel) } - /** Only used for ZK clusters when we update and start using topic IDs on existing topics */ - def assignTopicId(topicId: Uuid): Unit = { - if (keepPartitionMetadataFile) { - partitionMetadataFile.write(topicId) - this.topicId = topicId - } - } - private def initializeLeaderEpochCache(): Unit = lock synchronized { val leaderEpochFile = LeaderEpochCheckpointFile.newFile(dir)