diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 0760816786bca..debd25759cf7b 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -577,14 +577,6 @@ class Log(@volatile private var _dir: File, partitionMetadataFile = new PartitionMetadataFile(partitionMetadata, logDirFailureChannel) } - /** Only used for ZK clusters when we update and start using topic IDs on existing topics */ - def assignTopicId(topicId: Uuid): Unit = { - if (keepPartitionMetadataFile) { - partitionMetadataFile.write(topicId) - this.topicId = topicId - } - } - private def initializeLeaderEpochCache(): Unit = lock synchronized { val leaderEpochFile = LeaderEpochCheckpointFile.newFile(dir) @@ -609,6 +601,29 @@ class Log(@volatile private var _dir: File, } } + private def maybeFlushMetadataFile(): Unit = { + partitionMetadataFile.maybeFlush() + } + + /** Only used for ZK clusters when we update and start using topic IDs on existing topics */ + def assignTopicId(topicId: Uuid): Unit = { + if (!this.topicId.equals(Uuid.ZERO_UUID)) { + if (!this.topicId.equals(topicId)) { + // we should never get here as the topic IDs should have been checked in becomeLeaderOrFollower + throw new InconsistentTopicIdException(s"Tried to assign topic ID $topicId to log for topic partition $topicPartition," + + s"but log already contained topic ID ${this.topicId}") + } + } + + if (keepPartitionMetadataFile) { + this.topicId = topicId + if (!partitionMetadataFile.exists()) { + partitionMetadataFile.record(topicId) + scheduler.schedule("flush-metadata-file", maybeFlushMetadataFile) + } + } + } + /** * Removes any temporary files found in log directory, and creates a list of all .swap files which could be swapped * in place of existing segment(s). For log splitting, we know that any .swap file whose base offset is higher than @@ -1056,6 +1071,7 @@ class Log(@volatile private var _dir: File, def close(): Unit = { debug("Closing log") lock synchronized { + maybeFlushMetadataFile() checkIfMemoryMappedBufferClosed() producerExpireCheck.cancel(true) maybeHandleIOException(s"Error while renaming dir for $topicPartition in dir ${dir.getParent}") { @@ -1076,6 +1092,8 @@ class Log(@volatile private var _dir: File, def renameDir(name: String): Unit = { lock synchronized { maybeHandleIOException(s"Error while renaming dir for $topicPartition in log dir ${dir.getParent}") { + // Flush partitionMetadata file before initializing again + maybeFlushMetadataFile() val renamedDir = new File(dir.getParent, name) Utils.atomicMoveWithFallback(dir.toPath, renamedDir.toPath) if (renamedDir != dir) { @@ -1160,6 +1178,9 @@ class Log(@volatile private var _dir: File, validateAndAssignOffsets: Boolean, leaderEpoch: Int, ignoreRecordSize: Boolean): LogAppendInfo = { + // We want to ensure the partition metadata file is written to the log dir before any log data is written to disk. + // This will ensure that any log data can be recovered with the correct topic ID in the case of failure. + maybeFlushMetadataFile() val appendInfo = analyzeAndValidateRecords(records, origin, ignoreRecordSize, leaderEpoch) diff --git a/core/src/main/scala/kafka/server/PartitionMetadataFile.scala b/core/src/main/scala/kafka/server/PartitionMetadataFile.scala index 25b1ba6129d2b..1ccbcc1b0ab9b 100644 --- a/core/src/main/scala/kafka/server/PartitionMetadataFile.scala +++ b/core/src/main/scala/kafka/server/PartitionMetadataFile.scala @@ -24,7 +24,7 @@ import java.util.regex.Pattern import kafka.utils.Logging import org.apache.kafka.common.Uuid -import org.apache.kafka.common.errors.KafkaStorageException +import org.apache.kafka.common.errors.{InconsistentTopicIdException, KafkaStorageException} import org.apache.kafka.common.utils.Utils @@ -90,27 +90,48 @@ class PartitionMetadataFile(val file: File, private val tempPath = Paths.get(path.toString + ".tmp") private val lock = new Object() private val logDir = file.getParentFile.getParent + @volatile private var dirtyTopicIdOpt : Option[Uuid] = None + + /** + * Records the topic ID that will be flushed to disk. + */ + def record(topicId: Uuid): Unit = { + // Topic IDs should not differ, but we defensively check here to fail earlier in the case that the IDs somehow differ. + dirtyTopicIdOpt.foreach { dirtyTopicId => + if (dirtyTopicId != topicId) + throw new InconsistentTopicIdException(s"Tried to record topic ID $topicId to file " + + s"but had already recorded $dirtyTopicId") + } + dirtyTopicIdOpt = Some(topicId) + } - def write(topicId: Uuid): Unit = { - lock synchronized { - try { - // write to temp file and then swap with the existing file - val fileOutputStream = new FileOutputStream(tempPath.toFile) - val writer = new BufferedWriter(new OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8)) - try { - writer.write(PartitionMetadataFileFormatter.toFile(new PartitionMetadata(CurrentVersion,topicId))) - writer.flush() - fileOutputStream.getFD().sync() - } finally { - writer.close() - } + def maybeFlush(): Unit = { + // We check dirtyTopicId first to avoid having to take the lock unnecessarily in the frequently called log append path + dirtyTopicIdOpt.foreach { _ => + // We synchronize on the actual write to disk + lock synchronized { + dirtyTopicIdOpt.foreach { topicId => + try { + // write to temp file and then swap with the existing file + val fileOutputStream = new FileOutputStream(tempPath.toFile) + val writer = new BufferedWriter(new OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8)) + try { + writer.write(PartitionMetadataFileFormatter.toFile(new PartitionMetadata(CurrentVersion, topicId))) + writer.flush() + fileOutputStream.getFD().sync() + } finally { + writer.close() + } - Utils.atomicMoveWithFallback(tempPath, path) - } catch { - case e: IOException => - val msg = s"Error while writing to partition metadata file ${file.getAbsolutePath}" - logDirFailureChannel.maybeAddOfflineLogDir(logDir, msg, e) - throw new KafkaStorageException(msg, e) + Utils.atomicMoveWithFallback(tempPath, path) + } catch { + case e: IOException => + val msg = s"Error while writing to partition metadata file ${file.getAbsolutePath}" + logDirFailureChannel.maybeAddOfflineLogDir(logDir, msg, e) + throw new KafkaStorageException(msg, e) + } + dirtyTopicIdOpt = None + } } } } diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 947bcb7042c15..b29471310879e 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -2547,7 +2547,7 @@ class LogTest { var log = createLog(logDir, logConfig) val topicId = Uuid.randomUuid() - log.partitionMetadataFile.write(topicId) + log.assignTopicId(topicId) log.close() // test recovery case @@ -2556,6 +2556,37 @@ class LogTest { log.close() } + def testLogFlushesPartitionMetadataOnAppend(): Unit = { + val logConfig = LogTest.createLogConfig() + val log = createLog(logDir, logConfig) + val record = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("simpleValue".getBytes)) + + val topicId = Uuid.randomUuid() + log.partitionMetadataFile.record(topicId) + + // Should trigger a synchronous flush + log.appendAsLeader(record, leaderEpoch = 0) + assertTrue(log.partitionMetadataFile.exists()) + assertEquals(topicId, log.partitionMetadataFile.read().topicId) + } + + @Test + def testLogFlushesPartitionMetadataOnClose(): Unit = { + val logConfig = LogTest.createLogConfig() + var log = createLog(logDir, logConfig) + + val topicId = Uuid.randomUuid() + log.partitionMetadataFile.record(topicId) + + // Should trigger a synchronous flush + log.close() + + // We open the log again, and the partition metadata file should exist with the same ID. + log = createLog(logDir, logConfig) + assertTrue(log.partitionMetadataFile.exists()) + assertEquals(topicId, log.partitionMetadataFile.read().topicId) + } + /** * Test building the time index on the follower by setting assignOffsets to false. */ @@ -3117,7 +3148,7 @@ class LogTest { // Write a topic ID to the partition metadata file to ensure it is transferred correctly. val id = Uuid.randomUuid() log.topicId = id - log.partitionMetadataFile.write(id) + log.assignTopicId(id) log.appendAsLeader(TestUtils.records(List(new SimpleRecord("foo".getBytes()))), leaderEpoch = 5) assertEquals(Some(5), log.latestEpoch) @@ -3135,6 +3166,26 @@ class LogTest { assertEquals(id, log.partitionMetadataFile.read().topicId) } + @Test + def testTopicIdFlushesBeforeDirectoryRename(): Unit = { + val logConfig = LogTest.createLogConfig(segmentBytes = 1000, indexIntervalBytes = 1, maxMessageBytes = 64 * 1024) + val log = createLog(logDir, logConfig) + + // Write a topic ID to the partition metadata file to ensure it is transferred correctly. + val topicId = Uuid.randomUuid() + log.partitionMetadataFile.record(topicId) + + // Ensure that after a directory rename, the partition metadata file is written to the right location. + val tp = Log.parseTopicPartitionName(log.dir) + log.renameDir(Log.logDeleteDirName(tp)) + assertTrue(PartitionMetadataFile.newFile(log.dir).exists()) + assertFalse(PartitionMetadataFile.newFile(this.logDir).exists()) + + // Check the file holds the correct contents. + assertTrue(log.partitionMetadataFile.exists()) + assertEquals(topicId, log.partitionMetadataFile.read().topicId) + } + @Test def testLeaderEpochCacheClearedAfterDowngradeInAppendedMessages(): Unit = { val logConfig = LogTest.createLogConfig(segmentBytes = 1000, indexIntervalBytes = 1, maxMessageBytes = 64 * 1024) diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java new file mode 100644 index 0000000000000..d78bad362885e --- /dev/null +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.jmh.server; + +import java.util.Properties; + +import kafka.cluster.Partition; +import kafka.log.CleanerConfig; +import kafka.log.Defaults; +import kafka.log.LogConfig; +import kafka.log.LogManager; +import kafka.server.AlterIsrManager; +import kafka.server.BrokerTopicStats; +import kafka.server.KafkaConfig; +import kafka.server.LogDirFailureChannel; +import kafka.server.MetadataCache; +import kafka.server.QuotaFactory; +import kafka.server.ReplicaManager; +import kafka.server.ZkMetadataCache; +import kafka.server.checkpoints.OffsetCheckpoints; +import kafka.server.metadata.CachedConfigRepository; +import kafka.server.metadata.ConfigRepository; +import kafka.utils.KafkaScheduler; +import kafka.utils.Scheduler; +import kafka.utils.TestUtils; +import kafka.zk.KafkaZkClient; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.LeaderAndIsrRequestData; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; + +import scala.collection.JavaConverters; +import scala.Option; + +@Warmup(iterations = 5) +@Measurement(iterations = 5) +@Fork(3) +@BenchmarkMode(Mode.AverageTime) +@State(value = Scope.Benchmark) +public class PartitionCreationBench { + @Param({"false", "true"}) + public boolean useTopicIds; + + @Param({"2000"}) + public int numPartitions; + + private final String topicName = "foo"; + + private Option topicId; + private Scheduler scheduler; + private Metrics metrics; + private Time time; + private KafkaConfig brokerProperties; + + private ReplicaManager replicaManager; + private QuotaFactory.QuotaManagers quotaManagers; + private LogDirFailureChannel failureChannel; + private LogManager logManager; + private AlterIsrManager alterIsrManager; + private List topicPartitions; + + @SuppressWarnings("deprecation") + @Setup(Level.Invocation) + public void setup() { + if (useTopicIds) + topicId = Option.apply(Uuid.randomUuid()); + else + topicId = Option.empty(); + + this.scheduler = new KafkaScheduler(1, "scheduler-thread", true); + this.brokerProperties = KafkaConfig.fromProps(TestUtils.createBrokerConfig( + 0, TestUtils.MockZkConnect(), true, true, 9092, Option.empty(), Option.empty(), + Option.empty(), true, false, 0, false, 0, false, 0, Option.empty(), 1, true, 1, + (short) 1)); + this.metrics = new Metrics(); + this.time = Time.SYSTEM; + this.failureChannel = new LogDirFailureChannel(brokerProperties.logDirs().size()); + final BrokerTopicStats brokerTopicStats = new BrokerTopicStats(); + final List files = + JavaConverters.seqAsJavaList(brokerProperties.logDirs()).stream().map(File::new).collect(Collectors.toList()); + CleanerConfig cleanerConfig = CleanerConfig.apply(1, + 4 * 1024 * 1024L, 0.9d, + 1024 * 1024, 32 * 1024 * 1024, + Double.MAX_VALUE, 15 * 1000, true, "MD5"); + + ConfigRepository configRepository = new CachedConfigRepository(); + this.logManager = new LogManager(JavaConverters.asScalaIteratorConverter(files.iterator()).asScala().toSeq(), + JavaConverters.asScalaIteratorConverter(new ArrayList().iterator()).asScala().toSeq(), + configRepository, + createLogConfig(), + cleanerConfig, + 1, + 1000L, + 10000L, + 10000L, + 1000L, + 60000, + scheduler, + brokerTopicStats, + failureChannel, + Time.SYSTEM, + true); + scheduler.startup(); + final MetadataCache metadataCache = + new ZkMetadataCache(this.brokerProperties.brokerId()); + this.quotaManagers = + QuotaFactory.instantiate(this.brokerProperties, + this.metrics, + this.time, ""); + + KafkaZkClient zkClient = new KafkaZkClient(null, false, Time.SYSTEM) { + @Override + public Properties getEntityConfigs(String rootEntityType, String sanitizedEntityName) { + return new Properties(); + } + }; + this.alterIsrManager = TestUtils.createAlterIsrManager(); + this.replicaManager = new ReplicaManager( + this.brokerProperties, + this.metrics, + this.time, + Option.apply(zkClient), + this.scheduler, + this.logManager, + new AtomicBoolean(false), + this.quotaManagers, + brokerTopicStats, + metadataCache, + this.failureChannel, + alterIsrManager, + configRepository, + Option.empty()); + replicaManager.startup(); + replicaManager.checkpointHighWatermarks(); + } + + @TearDown(Level.Invocation) + public void tearDown() throws Exception { + this.replicaManager.shutdown(false); + logManager.shutdown(); + this.metrics.close(); + this.scheduler.shutdown(); + this.quotaManagers.shutdown(); + for (File dir : JavaConverters.asJavaCollection(logManager.liveLogDirs())) { + Utils.delete(dir); + } + } + + private static LogConfig createLogConfig() { + Properties logProps = new Properties(); + logProps.put(LogConfig.SegmentMsProp(), Defaults.SegmentMs()); + logProps.put(LogConfig.SegmentBytesProp(), Defaults.SegmentSize()); + logProps.put(LogConfig.RetentionMsProp(), Defaults.RetentionMs()); + logProps.put(LogConfig.RetentionBytesProp(), Defaults.RetentionSize()); + logProps.put(LogConfig.SegmentJitterMsProp(), Defaults.SegmentJitterMs()); + logProps.put(LogConfig.CleanupPolicyProp(), Defaults.CleanupPolicy()); + logProps.put(LogConfig.MaxMessageBytesProp(), Defaults.MaxMessageSize()); + logProps.put(LogConfig.IndexIntervalBytesProp(), Defaults.IndexInterval()); + logProps.put(LogConfig.SegmentIndexBytesProp(), Defaults.MaxIndexSize()); + logProps.put(LogConfig.MessageFormatVersionProp(), Defaults.MessageFormatVersion()); + logProps.put(LogConfig.FileDeleteDelayMsProp(), Defaults.FileDeleteDelayMs()); + return LogConfig.apply(logProps, new scala.collection.immutable.HashSet<>()); + } + + @Benchmark + @Threads(1) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + public void makeFollower() { + topicPartitions = new ArrayList<>(); + for (int partitionNum = 0; partitionNum < numPartitions; partitionNum++) { + topicPartitions.add(new TopicPartition(topicName, partitionNum)); + } + + List replicas = new ArrayList<>(); + replicas.add(0); + replicas.add(1); + replicas.add(2); + + OffsetCheckpoints checkpoints = (logDir, topicPartition) -> Option.apply(0L); + for (TopicPartition topicPartition : topicPartitions) { + final Partition partition = this.replicaManager.createPartition(topicPartition); + List inSync = new ArrayList<>(); + inSync.add(0); + inSync.add(1); + inSync.add(2); + + LeaderAndIsrRequestData.LeaderAndIsrPartitionState partitionState = new LeaderAndIsrRequestData.LeaderAndIsrPartitionState() + .setControllerEpoch(0) + .setLeader(0) + .setLeaderEpoch(0) + .setIsr(inSync) + .setZkVersion(1) + .setReplicas(replicas) + .setIsNew(true); + + partition.makeFollower(partitionState, checkpoints); + topicId.foreach(partition::checkOrSetTopicId); + } + } +}