From 0991ec1ff271e03d3d564f31c97dbbf615bb84e4 Mon Sep 17 00:00:00 2001 From: Gardner Vickers Date: Sat, 4 May 2019 22:43:46 -0700 Subject: [PATCH 01/10] Create benchmark for highwatermark checkpointing. Explicitly specify noop log4j logger to avoid warnings when benchmarking. --- build.gradle | 4 + .../server/HighwatermarkCheckpointBench.java | 151 ++++++++++++++++++ 2 files changed, 155 insertions(+) create mode 100644 jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/HighwatermarkCheckpointBench.java diff --git a/build.gradle b/build.gradle index aa8bf97a286f3..338a8ba4b36fb 100644 --- a/build.gradle +++ b/build.gradle @@ -1540,10 +1540,14 @@ project(':jmh-benchmarks') { compile project(':core') compile project(':clients') compile project(':streams') + compile project(':core') + compile project(':clients').sourceSets.test.output + compile project(':core').sourceSets.test.output compile libs.jmhCore compile libs.mockitoCore annotationProcessor libs.jmhGeneratorAnnProcess compile libs.jmhCoreBenchmarks + compile group: 'org.slf4j', name: 'slf4j-nop', version: '1.7.26' } jar { diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/HighwatermarkCheckpointBench.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/HighwatermarkCheckpointBench.java new file mode 100644 index 0000000000000..2c90d31ffd5d7 --- /dev/null +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/HighwatermarkCheckpointBench.java @@ -0,0 +1,151 @@ +package org.apache.kafka.jmh.server; + +import kafka.cluster.Partition; +import kafka.cluster.Replica; +import kafka.log.CleanerConfig; +import kafka.log.Log; +import kafka.log.LogConfig; +import kafka.log.LogManager; +import kafka.server.BrokerTopicStats; +import kafka.server.KafkaConfig; +import kafka.server.LogDirFailureChannel; +import kafka.server.MetadataCache; +import kafka.server.QuotaFactory; +import kafka.server.ReplicaManager; +import kafka.utils.KafkaScheduler; +import kafka.utils.MockTime; +import kafka.utils.Scheduler; +import kafka.utils.TestUtils; +import kafka.zk.KafkaZkClient; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.utils.Utils; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; +import scala.Option; +import scala.collection.JavaConverters; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; + + +@Warmup(iterations = 5) +@Measurement(iterations = 5) +@Fork(3) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@State(Scope.Benchmark) +public class HighwatermarkCheckpointBench { + @Param({"100", "1000", "2000"}) + public int numTopics; + @Param({"3"}) + public int numPartitions; + + private final String topicName = "foo"; + + private Scheduler scheduler; + + private Metrics metrics; + + private MockTime time; + + private KafkaConfig brokerProperties; + + private ReplicaManager replicaManager; + private QuotaFactory.QuotaManagers quotaManagers; + private LogDirFailureChannel failureChannel; + private LogManager logManager; + + + @Setup(Level.Trial) + public void setup() throws Exception { + this.scheduler = new KafkaScheduler(1, "scheduler-thread", true); + this.brokerProperties = KafkaConfig.fromProps(TestUtils.createBrokerConfig( + 0, TestUtils.MockZkConnect(), true, true, 9092, Option.empty(), Option.empty(), + Option.empty(), true, false, 0, false, 0, false, 0, Option.empty(), 1, true)); + this.metrics = new Metrics(); + this.time = new MockTime(); + this.failureChannel = new LogDirFailureChannel(brokerProperties.logDirs().size()); + final List files = + JavaConverters.seqAsJavaList(brokerProperties.logDirs()).stream().map(File::new).collect(Collectors.toList()); + this.logManager = TestUtils.createLogManager(JavaConverters.asScalaBuffer(files), + LogConfig.apply(), + CleanerConfig.apply( + 1, 4*1024*1024L, 0.9d, + 1024*1024, 32*1024*1024, + Double.MAX_VALUE, 15 * 1000, true, "MD5"), + time); + scheduler.startup(); + final BrokerTopicStats brokerTopicStats = new BrokerTopicStats(); + final MetadataCache metadataCache = + new MetadataCache(this.brokerProperties.brokerId()); + this.quotaManagers = + QuotaFactory.instantiate(this.brokerProperties, + this.metrics, + this.time, ""); + this.replicaManager = new ReplicaManager( + this.brokerProperties, + this.metrics, + this.time, + null,//this.zkClient, + this.scheduler, + this.logManager, + new AtomicBoolean(false), + this.quotaManagers, + brokerTopicStats, + metadataCache, + this.failureChannel, + Option.empty()); + this.replicaManager.startup(); + + List topicPartitions = new ArrayList<>(); + for (int topicNum = 0; topicNum < numTopics; topicNum++) { + final String topicName = this.topicName + "-" + topicNum; + for (int partitionNum = 0; partitionNum < numPartitions; partitionNum++) { + topicPartitions.add(new TopicPartition(topicName, partitionNum)); + } + } + this.replicaManager.checkpointHighWatermarks(); + for (TopicPartition topicPartition : topicPartitions) { + final Partition partition = + this.replicaManager.getOrCreatePartition(topicPartition); + final Log log = this.logManager.getOrCreateLog(topicPartition, + LogConfig.apply(), true, false); + final Replica replica = new Replica(this.brokerProperties.brokerId(), + topicPartition, this.time, 0, Option.apply(log)); + partition.addReplicaIfNotExists(replica); + } + } + + @TearDown(Level.Trial) + public void tearDown() throws Exception { + this.replicaManager.shutdown(false); + this.metrics.close(); + this.scheduler.shutdown(); + this.quotaManagers.shutdown(); + for (File dir : JavaConverters.asJavaCollection(logManager.liveLogDirs())) { + Utils.delete(dir); + } + } + + + @Benchmark + @Threads(1) + public void measureCheckpointHighWatermarks() { + this.replicaManager.checkpointHighWatermarks(); + } +} + From 660643f94311cf2ff89716c9cd6abdacaecae16f Mon Sep 17 00:00:00 2001 From: Gardner Vickers Date: Sat, 4 May 2019 23:31:24 -0700 Subject: [PATCH 02/10] Cache calls to `Log.dir.getParent` to avoid extra string allocations when constructing the parent directory path. Avoid extra copying of the in `ReplicaManager.checkpointHighWatermarks` --- core/src/main/scala/kafka/log/Log.scala | 4 +++ .../scala/kafka/server/ReplicaManager.scala | 35 +++++++++++++------ 2 files changed, 28 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index c6bce27164dec..1faf75440fdd5 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -235,6 +235,9 @@ class Log(@volatile var dir: File, /* last time it was flushed */ private val lastFlushedTime = new AtomicLong(time.milliseconds) + // Cache value of parent directory + @volatile var parentDir: String = dir.getParent + def initFileSize: Int = { if (config.preallocate) config.segmentSize @@ -962,6 +965,7 @@ class Log(@volatile var dir: File, Utils.atomicMoveWithFallback(dir.toPath, renamedDir.toPath) if (renamedDir != dir) { dir = renamedDir + parentDir = renamedDir.getParent logSegments.foreach(_.updateDir(renamedDir)) producerStateManager.logDir = dir // re-initialize leader epoch cache so that LeaderEpochCheckpointFile.checkpoint can correctly reference diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 084ffa5c4f44c..a2c41a666f37f 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -17,7 +17,8 @@ package kafka.server import java.io.File -import java.util.Optional +import java.util +import java.util.{Optional, function} import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} import java.util.concurrent.locks.Lock @@ -1598,19 +1599,31 @@ class ReplicaManager(val config: KafkaConfig, def getLogEndOffset(topicPartition: TopicPartition): Option[Long] = nonOfflinePartition(topicPartition).flatMap(_.leaderLogIfLocal.map(_.logEndOffset)) + private def populateHWMMap(mapping: java.util.HashMap[String, util.HashMap[TopicPartition, Long]], optReplica: Option[Replica]): Unit = { + optReplica.foreach(replica => { + if (replica.log.isDefined) { + val dir = replica.log.get.parentDir + val tp = replica.topicPartition + val slot = mapping.computeIfAbsent(dir, new function.Function[String, util.HashMap[TopicPartition, Long]] { + override def apply(t: String): util.HashMap[TopicPartition, Long] = { + return new util.HashMap[TopicPartition, Long]() + } + }) + slot.put(tp, replica.highWatermark.messageOffset) + } + }) + } + // Flushes the highwatermark value for all partitions to the highwatermark file def checkpointHighWatermarks(): Unit = { - val localLogs = nonOfflinePartitionsIterator.flatMap { partition => - val logsList: mutable.Set[Log] = mutable.Set() - partition.log.foreach(logsList.add) - partition.futureLog.foreach(logsList.add) - logsList - }.toBuffer - val logsByDir = localLogs.groupBy(_.dir.getParent) - for ((dir, logs) <- logsByDir) { - val hwms = logs.map(log => log.topicPartition -> log.highWatermark).toMap + val hwmMap = new util.HashMap[String, util.HashMap[TopicPartition, Long]](allPartitions.size) + nonOfflinePartitionsIterator.foreach(partition => { + populateHWMMap(hwmMap, partition.localReplica) + populateHWMMap(hwmMap, partition.futureLocalReplica) + }) + for ((dir, checkpoints) <- hwmMap.asScala) { try { - highWatermarkCheckpoints.get(dir).foreach(_.write(hwms)) + highWatermarkCheckpoints.get(dir).foreach(_.write(checkpoints.asScala)) } catch { case e: KafkaStorageException => error(s"Error while writing to highwatermark file in directory $dir", e) From 84a933f94ea04ddf8461e21cf4bff0e512a41b9d Mon Sep 17 00:00:00 2001 From: Gardner Vickers Date: Wed, 15 May 2019 15:59:59 -0700 Subject: [PATCH 03/10] Fix license header and Scala 2.11 build failures --- checkstyle/import-control-core.xml | 6 +++ checkstyle/import-control.xml | 5 +++ .../server/HighwatermarkCheckpointBench.java | 41 ++++++++++++------- 3 files changed, 37 insertions(+), 15 deletions(-) diff --git a/checkstyle/import-control-core.xml b/checkstyle/import-control-core.xml index 6e5042fd35d8a..5bd9f96f68384 100644 --- a/checkstyle/import-control-core.xml +++ b/checkstyle/import-control-core.xml @@ -58,4 +58,10 @@ + + + + + + diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index c4a7662d94ce3..b92d92ccce129 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -293,6 +293,11 @@ + + + + + diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/HighwatermarkCheckpointBench.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/HighwatermarkCheckpointBench.java index 2c90d31ffd5d7..c6cfc46ac381c 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/HighwatermarkCheckpointBench.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/HighwatermarkCheckpointBench.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.kafka.jmh.server; import kafka.cluster.Partition; @@ -16,7 +32,6 @@ import kafka.utils.MockTime; import kafka.utils.Scheduler; import kafka.utils.TestUtils; -import kafka.zk.KafkaZkClient; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.utils.Utils; @@ -33,7 +48,7 @@ import org.openjdk.jmh.annotations.Threads; import org.openjdk.jmh.annotations.Warmup; import scala.Option; -import scala.collection.JavaConverters; +import scala.collection.JavaConversions; import java.io.File; import java.util.ArrayList; @@ -71,7 +86,7 @@ public class HighwatermarkCheckpointBench { @Setup(Level.Trial) - public void setup() throws Exception { + public void setup() { this.scheduler = new KafkaScheduler(1, "scheduler-thread", true); this.brokerProperties = KafkaConfig.fromProps(TestUtils.createBrokerConfig( 0, TestUtils.MockZkConnect(), true, true, 9092, Option.empty(), Option.empty(), @@ -80,14 +95,11 @@ public void setup() throws Exception { this.time = new MockTime(); this.failureChannel = new LogDirFailureChannel(brokerProperties.logDirs().size()); final List files = - JavaConverters.seqAsJavaList(brokerProperties.logDirs()).stream().map(File::new).collect(Collectors.toList()); - this.logManager = TestUtils.createLogManager(JavaConverters.asScalaBuffer(files), - LogConfig.apply(), - CleanerConfig.apply( - 1, 4*1024*1024L, 0.9d, - 1024*1024, 32*1024*1024, - Double.MAX_VALUE, 15 * 1000, true, "MD5"), - time); + JavaConversions.seqAsJavaList(brokerProperties.logDirs()).stream().map(File::new).collect(Collectors.toList()); + this.logManager = TestUtils.createLogManager(JavaConversions.asScalaBuffer(files), + LogConfig.apply(), CleanerConfig.apply(1, 4 * 1024 * 1024L, 0.9d, + 1024 * 1024, 32 * 1024 * 1024, + Double.MAX_VALUE, 15 * 1000, true, "MD5"), time); scheduler.startup(); final BrokerTopicStats brokerTopicStats = new BrokerTopicStats(); final MetadataCache metadataCache = @@ -100,7 +112,7 @@ public void setup() throws Exception { this.brokerProperties, this.metrics, this.time, - null,//this.zkClient, + null, this.scheduler, this.logManager, new AtomicBoolean(false), @@ -136,7 +148,7 @@ public void tearDown() throws Exception { this.metrics.close(); this.scheduler.shutdown(); this.quotaManagers.shutdown(); - for (File dir : JavaConverters.asJavaCollection(logManager.liveLogDirs())) { + for (File dir : JavaConversions.asJavaCollection(logManager.liveLogDirs())) { Utils.delete(dir); } } @@ -147,5 +159,4 @@ public void tearDown() throws Exception { public void measureCheckpointHighWatermarks() { this.replicaManager.checkpointHighWatermarks(); } -} - +} \ No newline at end of file From 6b5b584718d22382509364e3208c56d4daecd916 Mon Sep 17 00:00:00 2001 From: Gardner Vickers Date: Thu, 23 May 2019 09:02:57 -0700 Subject: [PATCH 04/10] Add spotbugs exclusion for generated JMH code. --- gradle/spotbugs-exclude.xml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml index f3ca3171b6c96..a7174001f78c7 100644 --- a/gradle/spotbugs-exclude.xml +++ b/gradle/spotbugs-exclude.xml @@ -212,10 +212,11 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read - + + From 88bba8e6ef11d7d619419e3f1a6428c1d838e282 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Tue, 24 Mar 2020 09:32:44 -0700 Subject: [PATCH 05/10] Improve readability and safety --- .../main/scala/kafka/cluster/Partition.scala | 8 +-- core/src/main/scala/kafka/log/Log.scala | 67 ++++++++++--------- .../src/main/scala/kafka/log/LogCleaner.scala | 6 +- .../scala/kafka/log/LogCleanerManager.scala | 6 +- .../src/main/scala/kafka/log/LogManager.scala | 38 +++++------ .../scala/kafka/server/ReplicaManager.scala | 54 +++++++-------- .../kafka/server/ReplicaManagerTest.scala | 2 +- 7 files changed, 90 insertions(+), 91 deletions(-) diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index e55f89d741a4e..35189e4a32c1f 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -266,7 +266,7 @@ class Partition(val topicPartition: TopicPartition, // current replica and the existence of the future replica, no other thread can update the log directory of the // current replica or remove the future replica. inWriteLock(leaderIsrUpdateLock) { - val currentLogDir = localLogOrException.dir.getParent + val currentLogDir = localLogOrException.parentDir if (currentLogDir == logDir) { info(s"Current log directory $currentLogDir is same as requested log dir $logDir. " + s"Skipping future replica creation.") @@ -274,7 +274,7 @@ class Partition(val topicPartition: TopicPartition, } else { futureLog match { case Some(partitionFutureLog) => - val futureLogDir = partitionFutureLog.dir.getParent + val futureLogDir = partitionFutureLog.parentDir if (futureLogDir != logDir) throw new IllegalStateException(s"The future log dir $futureLogDir of $topicPartition is " + s"different from the requested log dir $logDir") @@ -310,7 +310,7 @@ class Partition(val topicPartition: TopicPartition, var maybeLog: Option[Log] = None try { val log = logManager.getOrCreateLog(topicPartition, fetchLogConfig(), isNew, isFutureReplica) - val checkpointHighWatermark = offsetCheckpoints.fetch(log.dir.getParent, topicPartition).getOrElse { + val checkpointHighWatermark = offsetCheckpoints.fetch(log.parentDir, topicPartition).getOrElse { info(s"No checkpointed highwatermark is found for partition $topicPartition") 0L } @@ -410,7 +410,7 @@ class Partition(val topicPartition: TopicPartition, def futureReplicaDirChanged(newDestinationDir: String): Boolean = { inReadLock(leaderIsrUpdateLock) { - futureLog.exists(_.dir.getParent != newDestinationDir) + futureLog.exists(_.parentDir != newDestinationDir) } } diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 1faf75440fdd5..20b0cd277d94e 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -188,7 +188,7 @@ object RollParams { * New log segments are created according to a configurable policy that controls the size in bytes or time interval * for a given segment. * - * @param dir The directory in which log segments are created. + * @param _dir The directory in which log segments are created. * @param config The log configuration settings * @param logStartOffset The earliest offset allowed to be exposed to kafka client. * The logStartOffset can be updated by : @@ -209,7 +209,7 @@ object RollParams { * @param producerIdExpirationCheckIntervalMs How often to check for producer ids which need to be expired */ @threadsafe -class Log(@volatile var dir: File, +class Log(@volatile private var _dir: File, @volatile var config: LogConfig, @volatile var logStartOffset: Long, @volatile var recoveryPoint: Long, @@ -228,39 +228,17 @@ class Log(@volatile var dir: File, /* A lock that guards all modifications to the log */ private val lock = new Object + // The memory mapped buffer for index files of this log will be closed with either delete() or closeHandlers() // After memory mapped buffer is closed, no disk IO operation should be performed for this log @volatile private var isMemoryMappedBufferClosed = false + // Cache value of parent directory to avoid allocations in hot paths like ReplicaManager.checkpointHighWatermarks + @volatile private var _parentDir: String = dir.getParent + /* last time it was flushed */ private val lastFlushedTime = new AtomicLong(time.milliseconds) - // Cache value of parent directory - @volatile var parentDir: String = dir.getParent - - def initFileSize: Int = { - if (config.preallocate) - config.segmentSize - else - 0 - } - - def updateConfig(newConfig: LogConfig): Unit = { - val oldConfig = this.config - this.config = newConfig - val oldRecordVersion = oldConfig.messageFormatVersion.recordVersion - val newRecordVersion = newConfig.messageFormatVersion.recordVersion - if (newRecordVersion.precedes(oldRecordVersion)) - warn(s"Record format version has been downgraded from $oldRecordVersion to $newRecordVersion.") - if (newRecordVersion.value != oldRecordVersion.value) - initializeLeaderEpochCache() - } - - private def checkIfMemoryMappedBufferClosed(): Unit = { - if (isMemoryMappedBufferClosed) - throw new KafkaStorageException(s"The memory mapped buffer for log of $topicPartition is already closed") - } - @volatile private var nextOffsetMetadata: LogOffsetMetadata = _ /* The earliest offset which is part of an incomplete transaction. This is used to compute the @@ -319,6 +297,35 @@ class Log(@volatile var dir: File, s"log end offset $logEndOffset in ${time.milliseconds() - startMs} ms") } + def dir: File = _dir + + def parentDir: String = _parentDir + + def parentDirFile: File = new File(_parentDir) + + def initFileSize: Int = { + if (config.preallocate) + config.segmentSize + else + 0 + } + + def updateConfig(newConfig: LogConfig): Unit = { + val oldConfig = this.config + this.config = newConfig + val oldRecordVersion = oldConfig.messageFormatVersion.recordVersion + val newRecordVersion = newConfig.messageFormatVersion.recordVersion + if (newRecordVersion.precedes(oldRecordVersion)) + warn(s"Record format version has been downgraded from $oldRecordVersion to $newRecordVersion.") + if (newRecordVersion.value != oldRecordVersion.value) + initializeLeaderEpochCache() + } + + private def checkIfMemoryMappedBufferClosed(): Unit = { + if (isMemoryMappedBufferClosed) + throw new KafkaStorageException(s"The memory mapped buffer for log of $topicPartition is already closed") + } + def highWatermark: Long = highWatermarkMetadata.messageOffset /** @@ -964,8 +971,8 @@ class Log(@volatile var dir: File, val renamedDir = new File(dir.getParent, name) Utils.atomicMoveWithFallback(dir.toPath, renamedDir.toPath) if (renamedDir != dir) { - dir = renamedDir - parentDir = renamedDir.getParent + _dir = renamedDir + _parentDir = renamedDir.getParent logSegments.foreach(_.updateDir(renamedDir)) producerStateManager.logDir = dir // re-initialize leader epoch cache so that LeaderEpochCheckpointFile.checkpoint can correctly reference diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 312483dfeefca..22207e0b4e17b 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -315,7 +315,7 @@ class LogCleaner(initialConfig: CleanerConfig, } catch { case e: LogCleaningException => warn(s"Unexpected exception thrown when cleaning log ${e.log}. Marking its partition (${e.log.topicPartition}) as uncleanable", e) - cleanerManager.markPartitionUncleanable(e.log.dir.getParent, e.log.topicPartition) + cleanerManager.markPartitionUncleanable(e.log.parentDir, e.log.topicPartition) false } @@ -365,11 +365,11 @@ class LogCleaner(initialConfig: CleanerConfig, case _: LogCleaningAbortedException => // task can be aborted, let it go. case _: KafkaStorageException => // partition is already offline. let it go. case e: IOException => - val logDirectory = cleanable.log.dir.getParent + val logDirectory = cleanable.log.parentDir val msg = s"Failed to clean up log for ${cleanable.topicPartition} in dir ${logDirectory} due to IOException" logDirFailureChannel.maybeAddOfflineLogDir(logDirectory, msg, e) } finally { - cleanerManager.doneCleaning(cleanable.topicPartition, cleanable.log.dir.getParentFile, endOffset) + cleanerManager.doneCleaning(cleanable.topicPartition, cleanable.log.parentDirFile, endOffset) } } diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala index fe22a61634e15..ba007c70ac28d 100755 --- a/core/src/main/scala/kafka/log/LogCleanerManager.scala +++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala @@ -182,7 +182,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], val offsetsToClean = cleanableOffsets(log, lastCleanOffset, now) // update checkpoint for logs with invalid checkpointed offsets if (offsetsToClean.forceUpdateCheckpoint) - updateCheckpoints(log.dir.getParentFile(), Option(topicPartition, offsetsToClean.firstDirtyOffset)) + updateCheckpoints(log.parentDirFile, Option(topicPartition, offsetsToClean.firstDirtyOffset)) val compactionDelayMs = maxCompactionDelay(log, offsetsToClean.firstDirtyOffset, now) preCleanStats.updateMaxCompactionDelay(compactionDelayMs) @@ -379,7 +379,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], case Some(offset) => // Remove this partition from the checkpoint file in the source log directory updateCheckpoints(sourceLogDir, None) - // Add offset for this partition to the checkpoint file in the source log directory + // Add offset for this partition to the checkpoint file in the destination log directory updateCheckpoints(destLogDir, Option(topicPartition, offset)) case None => } @@ -478,7 +478,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], private def isUncleanablePartition(log: Log, topicPartition: TopicPartition): Boolean = { inLock(lock) { - uncleanablePartitions.get(log.dir.getParent).exists(partitions => partitions.contains(topicPartition)) + uncleanablePartitions.get(log.parentDir).exists(partitions => partitions.contains(topicPartition)) } } } diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index d6b14b00652fb..bf4351c70f4b7 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -199,7 +199,7 @@ class LogManager(logDirs: Seq[File], cleaner.handleLogDirFailure(dir) val offlineCurrentTopicPartitions = currentLogs.collect { - case (tp, log) if log.dir.getParent == dir => tp + case (tp, log) if log.parentDir == dir => tp } offlineCurrentTopicPartitions.foreach { topicPartition => { val removedLog = currentLogs.remove(topicPartition) @@ -210,7 +210,7 @@ class LogManager(logDirs: Seq[File], }} val offlineFutureTopicPartitions = futureLogs.collect { - case (tp, log) if log.dir.getParent == dir => tp + case (tp, log) if log.parentDir == dir => tp } offlineFutureTopicPartitions.foreach { topicPartition => { val removedLog = futureLogs.remove(topicPartition) @@ -282,7 +282,7 @@ class LogManager(logDirs: Seq[File], } if (previous != null) { if (log.isFuture) - throw new IllegalStateException("Duplicate log directories found: %s, %s!".format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) + throw new IllegalStateException(s"Duplicate log directories found: ${log.dir.getAbsolutePath}, ${previous.dir.getAbsolutePath}") else throw new IllegalStateException(s"Duplicate log directories for $topicPartition are found in both ${log.dir.getAbsolutePath} " + s"and ${previous.dir.getAbsolutePath}. It is likely because log directory failure happened while broker was " + @@ -514,7 +514,7 @@ class LogManager(logDirs: Seq[File], if (log.truncateTo(truncateOffset)) affectedLogs += log if (needToStopCleaner && !isFuture) - cleaner.maybeTruncateCheckpoint(log.dir.getParentFile, topicPartition, log.activeSegment.baseOffset) + cleaner.maybeTruncateCheckpoint(log.parentDirFile, topicPartition, log.activeSegment.baseOffset) } finally { if (needToStopCleaner && !isFuture) { cleaner.resumeCleaning(Seq(topicPartition)) @@ -524,7 +524,7 @@ class LogManager(logDirs: Seq[File], } } - for ((dir, logs) <- affectedLogs.groupBy(_.dir.getParentFile)) { + for ((dir, logs) <- affectedLogs.groupBy(_.parentDirFile)) { checkpointRecoveryOffsetsAndCleanSnapshot(dir, logs) } } @@ -551,7 +551,7 @@ class LogManager(logDirs: Seq[File], try { log.truncateFullyAndStartAt(newOffset) if (cleaner != null && !isFuture) { - cleaner.maybeTruncateCheckpoint(log.dir.getParentFile, topicPartition, log.activeSegment.baseOffset) + cleaner.maybeTruncateCheckpoint(log.parentDirFile, topicPartition, log.activeSegment.baseOffset) } } finally { if (cleaner != null && !isFuture) { @@ -559,7 +559,7 @@ class LogManager(logDirs: Seq[File], info(s"Compaction for partition $topicPartition is resumed") } } - checkpointRecoveryOffsetsAndCleanSnapshot(log.dir.getParentFile, Seq(log)) + checkpointRecoveryOffsetsAndCleanSnapshot(log.parentDirFile, Seq(log)) } } @@ -633,8 +633,8 @@ class LogManager(logDirs: Seq[File], // The logDir should be an absolute path def maybeUpdatePreferredLogDir(topicPartition: TopicPartition, logDir: String): Unit = { // Do not cache the preferred log directory if either the current log or the future log for this partition exists in the specified logDir - if (!getLog(topicPartition).exists(_.dir.getParent == logDir) && - !getLog(topicPartition, isFuture = true).exists(_.dir.getParent == logDir)) + if (!getLog(topicPartition).exists(_.parentDir == logDir) && + !getLog(topicPartition, isFuture = true).exists(_.parentDir == logDir)) preferredLogDirs.put(topicPartition, logDir) } @@ -723,7 +723,7 @@ class LogManager(logDirs: Seq[File], if (isFuture) { if (preferredLogDir == null) throw new IllegalStateException(s"Can not create the future log for $topicPartition without having a preferred log directory") - else if (getLog(topicPartition).get.dir.getParent == preferredLogDir) + else if (getLog(topicPartition).get.parentDir == preferredLogDir) throw new IllegalStateException(s"Can not create the future log for $topicPartition in the current log directory of this partition") } @@ -818,7 +818,7 @@ class LogManager(logDirs: Seq[File], info(s"Deleted log for partition ${removedLog.topicPartition} in ${removedLog.dir.getAbsolutePath}.") } catch { case e: KafkaStorageException => - error(s"Exception while deleting $removedLog in dir ${removedLog.dir.getParent}.", e) + error(s"Exception while deleting $removedLog in dir ${removedLog.parentDir}.", e) } } } @@ -866,7 +866,7 @@ class LogManager(logDirs: Seq[File], futureLogs.remove(topicPartition) currentLogs.put(topicPartition, destLog) if (cleaner != null) { - cleaner.alterCheckpointDir(topicPartition, sourceLog.dir.getParentFile, destLog.dir.getParentFile) + cleaner.alterCheckpointDir(topicPartition, sourceLog.parentDirFile, destLog.parentDirFile) cleaner.resumeCleaning(Seq(topicPartition)) info(s"Compaction for partition $topicPartition is resumed") } @@ -876,8 +876,8 @@ class LogManager(logDirs: Seq[File], // Now that replica in source log directory has been successfully renamed for deletion. // Close the log, update checkpoint files, and enqueue this log to be deleted. sourceLog.close() - checkpointRecoveryOffsetsAndCleanSnapshot(sourceLog.dir.getParentFile, ArrayBuffer.empty) - checkpointLogStartOffsetsInDir(sourceLog.dir.getParentFile) + checkpointRecoveryOffsetsAndCleanSnapshot(sourceLog.parentDirFile, ArrayBuffer.empty) + checkpointLogStartOffsetsInDir(sourceLog.parentDirFile) addLogToBeDeleted(sourceLog) } catch { case e: KafkaStorageException => @@ -911,11 +911,11 @@ class LogManager(logDirs: Seq[File], //We need to wait until there is no more cleaning task on the log to be deleted before actually deleting it. if (cleaner != null && !isFuture) { cleaner.abortCleaning(topicPartition) - cleaner.updateCheckpoints(removedLog.dir.getParentFile) + cleaner.updateCheckpoints(removedLog.parentDirFile) } removedLog.renameDir(Log.logDeleteDirName(topicPartition)) - checkpointRecoveryOffsetsAndCleanSnapshot(removedLog.dir.getParentFile, ArrayBuffer.empty) - checkpointLogStartOffsetsInDir(removedLog.dir.getParentFile) + checkpointRecoveryOffsetsAndCleanSnapshot(removedLog.parentDirFile, ArrayBuffer.empty) + checkpointLogStartOffsetsInDir(removedLog.parentDirFile) addLogToBeDeleted(removedLog) info(s"Log for partition ${removedLog.topicPartition} is renamed to ${removedLog.dir.getAbsolutePath} and is scheduled for deletion") } else if (offlineLogDirs.nonEmpty) { @@ -934,7 +934,7 @@ class LogManager(logDirs: Seq[File], List(_liveLogDirs.peek()) } else { // count the number of logs in each parent directory (including 0 for empty directories - val logCounts = allLogs.groupBy(_.dir.getParent).mapValues(_.size) + val logCounts = allLogs.groupBy(_.parentDir).mapValues(_.size) val zeros = _liveLogDirs.asScala.map(dir => (dir.getPath, 0)).toMap val dirCounts = (zeros ++ logCounts).toBuffer @@ -1005,7 +1005,7 @@ class LogManager(logDirs: Seq[File], */ private def logsByDir: Map[String, Map[TopicPartition, Log]] = { (this.currentLogs.toList ++ this.futureLogs.toList).toMap - .groupBy { case (_, log) => log.dir.getParent } + .groupBy { case (_, log) => log.parentDir } } // logDir should be an absolute path diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index a2c41a666f37f..92ee37d6b9750 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -17,8 +17,7 @@ package kafka.server import java.io.File -import java.util -import java.util.{Optional, function} +import java.util.Optional import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} import java.util.concurrent.locks.Lock @@ -481,7 +480,7 @@ class ReplicaManager(val config: KafkaConfig, } def getLogDir(topicPartition: TopicPartition): Option[String] = { - localLog(topicPartition).map(_.dir.getParent) + localLog(topicPartition).map(_.parentDir) } /** @@ -662,7 +661,7 @@ class ReplicaManager(val config: KafkaConfig, * are included. There may be future logs (which will replace the current logs of the partition in the future) on the broker after KIP-113 is implemented. */ def describeLogDirs(partitions: Set[TopicPartition]): List[DescribeLogDirsResponseData.DescribeLogDirsResult] = { - val logsByDir = logManager.allLogs.groupBy(log => log.dir.getParent) + val logsByDir = logManager.allLogs.groupBy(log => log.parentDir) config.logDirs.toSet.map { logDir: String => val absolutePath = new File(logDir).getAbsolutePath @@ -1599,34 +1598,27 @@ class ReplicaManager(val config: KafkaConfig, def getLogEndOffset(topicPartition: TopicPartition): Option[Long] = nonOfflinePartition(topicPartition).flatMap(_.leaderLogIfLocal.map(_.logEndOffset)) - private def populateHWMMap(mapping: java.util.HashMap[String, util.HashMap[TopicPartition, Long]], optReplica: Option[Replica]): Unit = { - optReplica.foreach(replica => { - if (replica.log.isDefined) { - val dir = replica.log.get.parentDir - val tp = replica.topicPartition - val slot = mapping.computeIfAbsent(dir, new function.Function[String, util.HashMap[TopicPartition, Long]] { - override def apply(t: String): util.HashMap[TopicPartition, Long] = { - return new util.HashMap[TopicPartition, Long]() - } - }) - slot.put(tp, replica.highWatermark.messageOffset) - } - }) - } - // Flushes the highwatermark value for all partitions to the highwatermark file def checkpointHighWatermarks(): Unit = { - val hwmMap = new util.HashMap[String, util.HashMap[TopicPartition, Long]](allPartitions.size) - nonOfflinePartitionsIterator.foreach(partition => { - populateHWMMap(hwmMap, partition.localReplica) - populateHWMMap(hwmMap, partition.futureLocalReplica) - }) - for ((dir, checkpoints) <- hwmMap.asScala) { - try { - highWatermarkCheckpoints.get(dir).foreach(_.write(checkpoints.asScala)) - } catch { + def populateHwMap(logDirToCheckpoints: mutable.AnyRefMap[String, mutable.AnyRefMap[TopicPartition, Long]], + log: Log): Unit = { + val checkpoints = logDirToCheckpoints.getOrElseUpdate(log.parentDir, + new mutable.AnyRefMap[TopicPartition, Long]()) + checkpoints.put(log.topicPartition, log.highWatermark) + } + + val logDirToCheckpoints = new mutable.AnyRefMap[String, mutable.AnyRefMap[TopicPartition, Long]]( + allPartitions.size) + nonOfflinePartitionsIterator.foreach { partition => + partition.log.foreach(populateHwMap(logDirToCheckpoints, _)) + partition.futureLog.foreach(populateHwMap(logDirToCheckpoints, _)) + } + + for ((logDir, checkpoints) <- logDirToCheckpoints) { + try highWatermarkCheckpoints.get(logDir).foreach(_.write(checkpoints)) + catch { case e: KafkaStorageException => - error(s"Error while writing to highwatermark file in directory $dir", e) + error(s"Error while writing to highwatermark file in directory $logDir", e) } } } @@ -1645,11 +1637,11 @@ class ReplicaManager(val config: KafkaConfig, warn(s"Stopping serving replicas in dir $dir") replicaStateChangeLock synchronized { val newOfflinePartitions = nonOfflinePartitionsIterator.filter { partition => - partition.log.exists { _.dir.getParent == dir } + partition.log.exists { _.parentDir == dir } }.map(_.topicPartition).toSet val partitionsWithOfflineFutureReplica = nonOfflinePartitionsIterator.filter { partition => - partition.futureLog.exists { _.dir.getParent == dir } + partition.futureLog.exists { _.parentDir == dir } }.toSet replicaFetcherManager.removeFetcherForPartitions(newOfflinePartitions) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index e20ed10ebb698..f5b648733bc27 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -1293,7 +1293,7 @@ class ReplicaManagerTest { val mockBrokerTopicStats = new BrokerTopicStats val mockLogDirFailureChannel = new LogDirFailureChannel(config.logDirs.size) val mockLog = new Log( - dir = new File(new File(config.logDirs.head), s"$topic-0"), + _dir = new File(new File(config.logDirs.head), s"$topic-0"), config = LogConfig(), logStartOffset = 0L, recoveryPoint = 0L, From d66e1adcea15ce0f2461160ada36cb2a0d54ea56 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Tue, 24 Mar 2020 11:59:15 -0700 Subject: [PATCH 06/10] Various fixes --- .gitignore | 1 + .../main/scala/kafka/cluster/Partition.scala | 30 +++-- .../scala/kafka/server/ReplicaManager.scala | 22 ++-- .../kafka/cluster/AssignmentStateTest.scala | 2 +- .../kafka/cluster/PartitionLockTest.scala | 12 +- .../unit/kafka/cluster/PartitionTest.scala | 110 +++++++----------- .../kafka/server/ReplicaManagerTest.scala | 43 ++++--- .../ReplicaFetcherThreadBenchmark.java | 2 +- .../PartitionMakeFollowerBenchmark.java | 4 +- .../UpdateFollowerFetchStateBenchmark.java | 2 +- .../server/HighwatermarkCheckpointBench.java | 51 +++++--- 11 files changed, 129 insertions(+), 150 deletions(-) diff --git a/.gitignore b/.gitignore index 4bdb94e3c8216..f17d9dd6c6e47 100644 --- a/.gitignore +++ b/.gitignore @@ -54,4 +54,5 @@ systest/ clients/src/generated clients/src/generated-test jmh-benchmarks/generated +jmh-benchmarks/src/main/generated streams/src/generated diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 35189e4a32c1f..0c2fccbf5cafa 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -19,7 +19,7 @@ package kafka.cluster import java.util.concurrent.locks.ReentrantReadWriteLock import java.util.{Optional, Properties} -import kafka.api.{ApiVersion, LeaderAndIsr, Request} +import kafka.api.{ApiVersion, LeaderAndIsr} import kafka.common.UnexpectedAppendOffsetException import kafka.controller.KafkaController import kafka.log._ @@ -280,28 +280,28 @@ class Partition(val topicPartition: TopicPartition, s"different from the requested log dir $logDir") false case None => - createLogIfNotExists(Request.FutureLocalReplicaId, isNew = false, isFutureReplica = true, highWatermarkCheckpoints) + createLogIfNotExists(isNew = false, isFutureReplica = true, highWatermarkCheckpoints) true } } } } - def createLogIfNotExists(replicaId: Int, isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints): Unit = { + def createLogIfNotExists(isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints): Unit = { isFutureReplica match { case true if futureLog.isEmpty => - val log = createLog(replicaId, isNew, isFutureReplica, offsetCheckpoints) + val log = createLog(isNew, isFutureReplica, offsetCheckpoints) this.futureLog = Option(log) case false if log.isEmpty => - val log = createLog(replicaId, isNew, isFutureReplica, offsetCheckpoints) + val log = createLog(isNew, isFutureReplica, offsetCheckpoints) this.log = Option(log) case _ => trace(s"${if (isFutureReplica) "Future Log" else "Log"} already exists.") } } // Visible for testing - private[cluster] def createLog(replicaId: Int, isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints): Log = { - val fetchLogConfig = () => { + private[cluster] def createLog(isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints): Log = { + def fetchLogConfig: LogConfig = { val props = stateStore.fetchTopicConfig() LogConfig.fromProps(logManager.currentDefaultConfig.originals, props) } @@ -309,7 +309,7 @@ class Partition(val topicPartition: TopicPartition, logManager.initializingLog(topicPartition) var maybeLog: Option[Log] = None try { - val log = logManager.getOrCreateLog(topicPartition, fetchLogConfig(), isNew, isFutureReplica) + val log = logManager.getOrCreateLog(topicPartition, fetchLogConfig, isNew, isFutureReplica) val checkpointHighWatermark = offsetCheckpoints.fetch(log.parentDir, topicPartition).getOrElse { info(s"No checkpointed highwatermark is found for partition $topicPartition") 0L @@ -319,7 +319,7 @@ class Partition(val topicPartition: TopicPartition, maybeLog = Some(log) log } finally { - logManager.finishedInitializingLog(topicPartition, maybeLog, fetchLogConfig) + logManager.finishedInitializingLog(topicPartition, maybeLog, () => fetchLogConfig) } } @@ -478,9 +478,7 @@ class Partition(val topicPartition: TopicPartition, * from the time when this broker was the leader last time) and setting the new leader and ISR. * If the leader replica id does not change, return false to indicate the replica manager. */ - def makeLeader(controllerId: Int, - partitionState: LeaderAndIsrPartitionState, - correlationId: Int, + def makeLeader(partitionState: LeaderAndIsrPartitionState, highWatermarkCheckpoints: OffsetCheckpoints): Boolean = { val (leaderHWIncremented, isNewLeader) = inWriteLock(leaderIsrUpdateLock) { // record the epoch of the controller that made the leadership decision. This is useful while updating the isr @@ -493,7 +491,7 @@ class Partition(val topicPartition: TopicPartition, addingReplicas = partitionState.addingReplicas.asScala.map(_.toInt), removingReplicas = partitionState.removingReplicas.asScala.map(_.toInt) ) - createLogIfNotExists(localBrokerId, partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints) + createLogIfNotExists(partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints) val leaderLog = localLogOrException val leaderEpochStartOffset = leaderLog.logEndOffset @@ -549,9 +547,7 @@ class Partition(val topicPartition: TopicPartition, * greater (that is, no updates have been missed), return false to indicate to the * replica manager that state is already correct and the become-follower steps can be skipped */ - def makeFollower(controllerId: Int, - partitionState: LeaderAndIsrPartitionState, - correlationId: Int, + def makeFollower(partitionState: LeaderAndIsrPartitionState, highWatermarkCheckpoints: OffsetCheckpoints): Boolean = { inWriteLock(leaderIsrUpdateLock) { val newLeaderBrokerId = partitionState.leader @@ -566,7 +562,7 @@ class Partition(val topicPartition: TopicPartition, addingReplicas = partitionState.addingReplicas.asScala.map(_.toInt), removingReplicas = partitionState.removingReplicas.asScala.map(_.toInt) ) - createLogIfNotExists(localBrokerId, partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints) + createLogIfNotExists(partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints) leaderEpoch = partitionState.leaderEpoch leaderEpochStartOffsetOpt = None diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 92ee37d6b9750..b50d9beced412 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -1303,7 +1303,7 @@ class ReplicaManager(val config: KafkaConfig, val leader = BrokerEndPoint(config.brokerId, "localhost", -1) // Add future replica to partition's map - partition.createLogIfNotExists(Request.FutureLocalReplicaId, isNew = false, isFutureReplica = true, + partition.createLogIfNotExists(isNew = false, isFutureReplica = true, highWatermarkCheckpoints) // pause cleaning for partitions that are being moved and start ReplicaAlterDirThread to move @@ -1369,7 +1369,7 @@ class ReplicaManager(val config: KafkaConfig, // Update the partition information to be the leader partitionStates.foreach { case (partition, partitionState) => try { - if (partition.makeLeader(controllerId, partitionState, correlationId, highWatermarkCheckpoints)) { + if (partition.makeLeader(partitionState, highWatermarkCheckpoints)) { partitionsToMakeLeaders += partition stateChangeLogger.trace(s"Stopped fetchers as part of become-leader request from " + s"controller $controllerId epoch $controllerEpoch with correlation id $correlationId for partition ${partition.topicPartition} " + @@ -1451,7 +1451,7 @@ class ReplicaManager(val config: KafkaConfig, metadataCache.getAliveBrokers.find(_.id == newLeaderBrokerId) match { // Only change partition state when the leader is available case Some(_) => - if (partition.makeFollower(controllerId, partitionState, correlationId, highWatermarkCheckpoints)) + if (partition.makeFollower(partitionState, highWatermarkCheckpoints)) partitionsToMakeFollower += partition else stateChangeLogger.info(s"Skipped the become-follower state change after marking its partition as " + @@ -1468,7 +1468,7 @@ class ReplicaManager(val config: KafkaConfig, s"but cannot become follower since the new leader $newLeaderBrokerId is unavailable.") // Create the local replica even if the leader is unavailable. This is required to ensure that we include // the partition's high watermark in the checkpoint file (see KAFKA-1647) - partition.createLogIfNotExists(localBrokerId, isNew = partitionState.isNew, isFutureReplica = false, + partition.createLogIfNotExists(isNew = partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints) } } catch { @@ -1600,22 +1600,22 @@ class ReplicaManager(val config: KafkaConfig, // Flushes the highwatermark value for all partitions to the highwatermark file def checkpointHighWatermarks(): Unit = { - def populateHwMap(logDirToCheckpoints: mutable.AnyRefMap[String, mutable.AnyRefMap[TopicPartition, Long]], - log: Log): Unit = { + def putHw(logDirToCheckpoints: mutable.AnyRefMap[String, mutable.AnyRefMap[TopicPartition, Long]], + log: Log): Unit = { val checkpoints = logDirToCheckpoints.getOrElseUpdate(log.parentDir, new mutable.AnyRefMap[TopicPartition, Long]()) checkpoints.put(log.topicPartition, log.highWatermark) } - val logDirToCheckpoints = new mutable.AnyRefMap[String, mutable.AnyRefMap[TopicPartition, Long]]( + val logDirToHws = new mutable.AnyRefMap[String, mutable.AnyRefMap[TopicPartition, Long]]( allPartitions.size) nonOfflinePartitionsIterator.foreach { partition => - partition.log.foreach(populateHwMap(logDirToCheckpoints, _)) - partition.futureLog.foreach(populateHwMap(logDirToCheckpoints, _)) + partition.log.foreach(putHw(logDirToHws, _)) + partition.futureLog.foreach(putHw(logDirToHws, _)) } - for ((logDir, checkpoints) <- logDirToCheckpoints) { - try highWatermarkCheckpoints.get(logDir).foreach(_.write(checkpoints)) + for ((logDir, hws) <- logDirToHws) { + try highWatermarkCheckpoints.get(logDir).foreach(_.write(hws)) catch { case e: KafkaStorageException => error(s"Error while writing to highwatermark file in directory $logDir", e) diff --git a/core/src/test/scala/unit/kafka/cluster/AssignmentStateTest.scala b/core/src/test/scala/unit/kafka/cluster/AssignmentStateTest.scala index 08dd95ec7d567..b218bf7297431 100644 --- a/core/src/test/scala/unit/kafka/cluster/AssignmentStateTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/AssignmentStateTest.scala @@ -112,7 +112,7 @@ class AssignmentStateTest(isr: List[Integer], replicas: List[Integer], if (original.nonEmpty) partition.assignmentState = SimpleAssignmentState(original) // do the test - partition.makeLeader(controllerId, leaderState, 0, offsetCheckpoints) + partition.makeLeader(leaderState, offsetCheckpoints) assertEquals(isReassigning, partition.isReassigning) if (adding.nonEmpty) adding.foreach(r => assertTrue(partition.isAddingReplica(r))) diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala index cb1fd80876e0f..c8a7d33a5d27a 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala @@ -222,8 +222,8 @@ class PartitionLockTest extends Logging { } } - override def createLog(replicaId: Int, isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints): Log = { - val log = super.createLog(replicaId, isNew, isFutureReplica, offsetCheckpoints) + override def createLog(isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints): Log = { + val log = super.createLog(isNew, isFutureReplica, offsetCheckpoints) new SlowLog(log, mockTime, appendSemaphore) } } @@ -235,21 +235,21 @@ class PartitionLockTest extends Logging { when(stateStore.expandIsr(ArgumentMatchers.anyInt, ArgumentMatchers.any[LeaderAndIsr])) .thenReturn(Some(2)) - partition.createLogIfNotExists(brokerId, isNew = false, isFutureReplica = false, offsetCheckpoints) + partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints) val controllerId = 0 val controllerEpoch = 0 val replicas = (0 to numReplicaFetchers).map(i => Integer.valueOf(brokerId + i)).toList.asJava val isr = replicas - assertTrue("Expected become leader transition to succeed", partition.makeLeader(controllerId, new LeaderAndIsrPartitionState() + assertTrue("Expected become leader transition to succeed", partition.makeLeader(new LeaderAndIsrPartitionState() .setControllerEpoch(controllerEpoch) .setLeader(brokerId) .setLeaderEpoch(leaderEpoch) .setIsr(isr) .setZkVersion(1) .setReplicas(replicas) - .setIsNew(true), 0, offsetCheckpoints)) + .setIsNew(true), offsetCheckpoints)) partition } @@ -310,4 +310,4 @@ class PartitionLockTest extends Logging { appendInfo } } -} \ No newline at end of file +} diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala index 7967428c49fbc..62ec28b2733fe 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala @@ -109,7 +109,7 @@ class PartitionTest extends AbstractPartitionTest { val latch = new CountDownLatch(1) logManager.maybeUpdatePreferredLogDir(topicPartition, logDir1.getAbsolutePath) - partition.createLogIfNotExists(brokerId, isNew = true, isFutureReplica = false, offsetCheckpoints) + partition.createLogIfNotExists(isNew = true, isFutureReplica = false, offsetCheckpoints) logManager.maybeUpdatePreferredLogDir(topicPartition, logDir2.getAbsolutePath) partition.maybeCreateFutureReplica(logDir2.getAbsolutePath, offsetCheckpoints) @@ -153,13 +153,13 @@ class PartitionTest extends AbstractPartitionTest { metadataCache, logManager) { - override def createLog(replicaId: Int, isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints): Log = { - val log = super.createLog(replicaId, isNew, isFutureReplica, offsetCheckpoints) + override def createLog(isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints): Log = { + val log = super.createLog(isNew, isFutureReplica, offsetCheckpoints) new SlowLog(log, mockTime, appendSemaphore) } } - partition.createLogIfNotExists(brokerId, isNew = true, isFutureReplica = false, offsetCheckpoints) + partition.createLogIfNotExists(isNew = true, isFutureReplica = false, offsetCheckpoints) val appendThread = new Thread { override def run(): Unit = { @@ -180,7 +180,7 @@ class PartitionTest extends AbstractPartitionTest { .setZkVersion(1) .setReplicas(List[Integer](0, 1, 2, brokerId).asJava) .setIsNew(false) - assertTrue(partition.makeFollower(0, partitionState, 0, offsetCheckpoints)) + assertTrue(partition.makeFollower(partitionState, offsetCheckpoints)) appendSemaphore.release() appendThread.join() @@ -194,7 +194,7 @@ class PartitionTest extends AbstractPartitionTest { // active segment def testMaybeReplaceCurrentWithFutureReplicaDifferentBaseOffsets(): Unit = { logManager.maybeUpdatePreferredLogDir(topicPartition, logDir1.getAbsolutePath) - partition.createLogIfNotExists(brokerId, isNew = true, isFutureReplica = false, offsetCheckpoints) + partition.createLogIfNotExists(isNew = true, isFutureReplica = false, offsetCheckpoints) logManager.maybeUpdatePreferredLogDir(topicPartition, logDir2.getAbsolutePath) partition.maybeCreateFutureReplica(logDir2.getAbsolutePath, offsetCheckpoints) @@ -465,7 +465,6 @@ class PartitionTest extends AbstractPartitionTest { val leader = brokerId val follower1 = brokerId + 1 val follower2 = brokerId + 2 - val controllerId = brokerId + 3 val replicas = List(leader, follower1, follower2) val isr = List[Integer](leader, follower2).asJava val leaderEpoch = 8 @@ -486,7 +485,7 @@ class PartitionTest extends AbstractPartitionTest { .setIsNew(true) assertTrue("Expected first makeLeader() to return 'leader changed'", - partition.makeLeader(controllerId, leaderState, 0, offsetCheckpoints)) + partition.makeLeader(leaderState, offsetCheckpoints)) assertEquals("Current leader epoch", leaderEpoch, partition.getLeaderEpoch) assertEquals("ISR", Set[Integer](leader, follower2), partition.inSyncReplicaIds) @@ -561,7 +560,7 @@ class PartitionTest extends AbstractPartitionTest { .setReplicas(replicas.map(Int.box).asJava) .setIsNew(false) - assertTrue(partition.makeFollower(controllerId, followerState, 1, offsetCheckpoints)) + assertTrue(partition.makeFollower(followerState, offsetCheckpoints)) // Back to leader, this resets the startLogOffset for this epoch (to 2), we're now in the fault condition val newLeaderState = new LeaderAndIsrPartitionState() @@ -573,7 +572,7 @@ class PartitionTest extends AbstractPartitionTest { .setReplicas(replicas.map(Int.box).asJava) .setIsNew(false) - assertTrue(partition.makeLeader(controllerId, newLeaderState, 2, offsetCheckpoints)) + assertTrue(partition.makeLeader(newLeaderState, offsetCheckpoints)) // Try to get offsets as a client fetchOffsetsForTimestamp(ListOffsetRequest.LATEST_TIMESTAMP, Some(IsolationLevel.READ_UNCOMMITTED)) match { @@ -636,34 +635,33 @@ class PartitionTest extends AbstractPartitionTest { private def setupPartitionWithMocks(leaderEpoch: Int, isLeader: Boolean, log: Log = logManager.getOrCreateLog(topicPartition, logConfig)): Partition = { - partition.createLogIfNotExists(brokerId, isNew = false, isFutureReplica = false, offsetCheckpoints) + partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints) - val controllerId = 0 val controllerEpoch = 0 val replicas = List[Integer](brokerId, brokerId + 1).asJava val isr = replicas if (isLeader) { assertTrue("Expected become leader transition to succeed", - partition.makeLeader(controllerId, new LeaderAndIsrPartitionState() + partition.makeLeader(new LeaderAndIsrPartitionState() .setControllerEpoch(controllerEpoch) .setLeader(brokerId) .setLeaderEpoch(leaderEpoch) .setIsr(isr) .setZkVersion(1) .setReplicas(replicas) - .setIsNew(true), 0, offsetCheckpoints)) + .setIsNew(true), offsetCheckpoints)) assertEquals(leaderEpoch, partition.getLeaderEpoch) } else { assertTrue("Expected become follower transition to succeed", - partition.makeFollower(controllerId, new LeaderAndIsrPartitionState() + partition.makeFollower(new LeaderAndIsrPartitionState() .setControllerEpoch(controllerEpoch) .setLeader(brokerId + 1) .setLeaderEpoch(leaderEpoch) .setIsr(isr) .setZkVersion(1) .setReplicas(replicas) - .setIsNew(true), 0, offsetCheckpoints)) + .setIsNew(true), offsetCheckpoints)) assertEquals(leaderEpoch, partition.getLeaderEpoch) assertEquals(None, partition.leaderLogIfLocal) } @@ -673,7 +671,7 @@ class PartitionTest extends AbstractPartitionTest { @Test def testAppendRecordsAsFollowerBelowLogStartOffset(): Unit = { - partition.createLogIfNotExists(brokerId, isNew = false, isFutureReplica = false, offsetCheckpoints) + partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints) val log = partition.localLogOrException val initialLogStartOffset = 5L @@ -723,7 +721,6 @@ class PartitionTest extends AbstractPartitionTest { @Test def testListOffsetIsolationLevels(): Unit = { - val controllerId = 0 val controllerEpoch = 0 val leaderEpoch = 5 val replicas = List[Integer](brokerId, brokerId + 1).asJava @@ -731,17 +728,17 @@ class PartitionTest extends AbstractPartitionTest { doNothing().when(delayedOperations).checkAndCompleteFetch() - partition.createLogIfNotExists(brokerId, isNew = false, isFutureReplica = false, offsetCheckpoints) + partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints) assertTrue("Expected become leader transition to succeed", - partition.makeLeader(controllerId, new LeaderAndIsrPartitionState() + partition.makeLeader(new LeaderAndIsrPartitionState() .setControllerEpoch(controllerEpoch) .setLeader(brokerId) .setLeaderEpoch(leaderEpoch) .setIsr(isr) .setZkVersion(1) .setReplicas(replicas) - .setIsNew(true), 0, offsetCheckpoints)) + .setIsNew(true), offsetCheckpoints)) assertEquals(leaderEpoch, partition.getLeaderEpoch) val records = createTransactionalRecords(List( @@ -811,7 +808,7 @@ class PartitionTest extends AbstractPartitionTest { .setZkVersion(1) .setReplicas(List[Integer](0, 1, 2, brokerId).asJava) .setIsNew(false) - partition.makeFollower(0, partitionState, 0, offsetCheckpoints) + partition.makeFollower(partitionState, offsetCheckpoints) // Request with same leader and epoch increases by only 1, do become-follower steps partitionState = new LeaderAndIsrPartitionState() @@ -822,7 +819,7 @@ class PartitionTest extends AbstractPartitionTest { .setZkVersion(1) .setReplicas(List[Integer](0, 1, 2, brokerId).asJava) .setIsNew(false) - assertTrue(partition.makeFollower(0, partitionState, 2, offsetCheckpoints)) + assertTrue(partition.makeFollower(partitionState, offsetCheckpoints)) // Request with same leader and same epoch, skip become-follower steps partitionState = new LeaderAndIsrPartitionState() @@ -832,7 +829,7 @@ class PartitionTest extends AbstractPartitionTest { .setIsr(List[Integer](0, 1, 2, brokerId).asJava) .setZkVersion(1) .setReplicas(List[Integer](0, 1, 2, brokerId).asJava) - assertFalse(partition.makeFollower(0, partitionState, 2, offsetCheckpoints)) + assertFalse(partition.makeFollower(partitionState, offsetCheckpoints)) } @Test @@ -841,7 +838,6 @@ class PartitionTest extends AbstractPartitionTest { val leader = brokerId val follower1 = brokerId + 1 val follower2 = brokerId + 2 - val controllerId = brokerId + 3 val replicas = List[Integer](leader, follower1, follower2).asJava val isr = List[Integer](leader, follower2).asJava val leaderEpoch = 8 @@ -862,7 +858,7 @@ class PartitionTest extends AbstractPartitionTest { .setReplicas(replicas) .setIsNew(true) assertTrue("Expected first makeLeader() to return 'leader changed'", - partition.makeLeader(controllerId, leaderState, 0, offsetCheckpoints)) + partition.makeLeader(leaderState, offsetCheckpoints)) assertEquals("Current leader epoch", leaderEpoch, partition.getLeaderEpoch) assertEquals("ISR", Set[Integer](leader, follower2), partition.inSyncReplicaIds) @@ -898,7 +894,7 @@ class PartitionTest extends AbstractPartitionTest { .setZkVersion(1) .setReplicas(replicas) .setIsNew(false) - partition.makeFollower(controllerId, followerState, 1, offsetCheckpoints) + partition.makeFollower(followerState, offsetCheckpoints) val newLeaderState = new LeaderAndIsrPartitionState() .setControllerEpoch(controllerEpoch) @@ -909,7 +905,7 @@ class PartitionTest extends AbstractPartitionTest { .setReplicas(replicas) .setIsNew(false) assertTrue("Expected makeLeader() to return 'leader changed' after makeFollower()", - partition.makeLeader(controllerEpoch, newLeaderState, 2, offsetCheckpoints)) + partition.makeLeader(newLeaderState, offsetCheckpoints)) val currentLeaderEpochStartOffset = partition.localLogOrException.logEndOffset // append records with the latest leader epoch @@ -937,7 +933,6 @@ class PartitionTest extends AbstractPartitionTest { */ @Test def testDelayedFetchAfterAppendRecords(): Unit = { - val controllerId = 0 val controllerEpoch = 0 val leaderEpoch = 5 val replicaIds = List[Integer](brokerId, brokerId + 1).asJava @@ -978,7 +973,7 @@ class PartitionTest extends AbstractPartitionTest { .setZkVersion(1) .setReplicas(replicaIds) .setIsNew(true) - partition.makeLeader(controllerId, leaderState, 0, offsetCheckpoints) + partition.makeLeader(leaderState, offsetCheckpoints) partitions += partition } @@ -1035,8 +1030,7 @@ class PartitionTest extends AbstractPartitionTest { } def createTransactionalRecords(records: Iterable[SimpleRecord], - baseOffset: Long, - partitionLeaderEpoch: Int = 0): MemoryRecords = { + baseOffset: Long): MemoryRecords = { val producerId = 1L val producerEpoch = 0.toShort val baseSequence = 0 @@ -1058,7 +1052,6 @@ class PartitionTest extends AbstractPartitionTest { val leader = brokerId val follower1 = brokerId + 1 val follower2 = brokerId + 2 - val controllerId = brokerId + 3 val replicas = List[Integer](leader, follower1, follower2).asJava val isr = List[Integer](leader).asJava val leaderEpoch = 8 @@ -1073,7 +1066,7 @@ class PartitionTest extends AbstractPartitionTest { .setZkVersion(1) .setReplicas(replicas) .setIsNew(true) - partition.makeLeader(controllerId, leaderState, 0, offsetCheckpoints) + partition.makeLeader(leaderState, offsetCheckpoints) assertTrue(partition.isAtMinIsr) } @@ -1082,7 +1075,6 @@ class PartitionTest extends AbstractPartitionTest { val log = logManager.getOrCreateLog(topicPartition, logConfig) seedLogData(log, numRecords = 6, leaderEpoch = 4) - val controllerId = 0 val controllerEpoch = 0 val leaderEpoch = 5 val remoteBrokerId = brokerId + 1 @@ -1091,12 +1083,11 @@ class PartitionTest extends AbstractPartitionTest { doNothing().when(delayedOperations).checkAndCompleteFetch() - partition.createLogIfNotExists(brokerId, isNew = false, isFutureReplica = false, offsetCheckpoints) + partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints) val initializeTimeMs = time.milliseconds() assertTrue("Expected become leader transition to succeed", partition.makeLeader( - controllerId, new LeaderAndIsrPartitionState() .setControllerEpoch(controllerEpoch) .setLeader(brokerId) @@ -1105,7 +1096,6 @@ class PartitionTest extends AbstractPartitionTest { .setZkVersion(1) .setReplicas(replicas) .setIsNew(true), - 0, offsetCheckpoints)) val remoteReplica = partition.getReplica(remoteBrokerId).get @@ -1146,7 +1136,6 @@ class PartitionTest extends AbstractPartitionTest { val log = logManager.getOrCreateLog(topicPartition, logConfig) seedLogData(log, numRecords = 10, leaderEpoch = 4) - val controllerId = 0 val controllerEpoch = 0 val leaderEpoch = 5 val remoteBrokerId = brokerId + 1 @@ -1155,11 +1144,10 @@ class PartitionTest extends AbstractPartitionTest { doNothing().when(delayedOperations).checkAndCompleteFetch() - partition.createLogIfNotExists(brokerId, isNew = false, isFutureReplica = false, offsetCheckpoints) + partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints) assertTrue( "Expected become leader transition to succeed", partition.makeLeader( - controllerId, new LeaderAndIsrPartitionState() .setControllerEpoch(controllerEpoch) .setLeader(brokerId) @@ -1168,7 +1156,6 @@ class PartitionTest extends AbstractPartitionTest { .setZkVersion(1) .setReplicas(replicas.map(Int.box).asJava) .setIsNew(true), - 0, offsetCheckpoints) ) assertEquals(Set(brokerId), partition.inSyncReplicaIds) @@ -1213,7 +1200,6 @@ class PartitionTest extends AbstractPartitionTest { val log = logManager.getOrCreateLog(topicPartition, logConfig) seedLogData(log, numRecords = 10, leaderEpoch = 4) - val controllerId = 0 val controllerEpoch = 0 val leaderEpoch = 5 val remoteBrokerId = brokerId + 1 @@ -1222,10 +1208,9 @@ class PartitionTest extends AbstractPartitionTest { doNothing().when(delayedOperations).checkAndCompleteFetch() - partition.createLogIfNotExists(brokerId, isNew = false, isFutureReplica = false, offsetCheckpoints) + partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints) assertTrue("Expected become leader transition to succeed", partition.makeLeader( - controllerId, new LeaderAndIsrPartitionState() .setControllerEpoch(controllerEpoch) .setLeader(brokerId) @@ -1234,7 +1219,6 @@ class PartitionTest extends AbstractPartitionTest { .setZkVersion(1) .setReplicas(replicas) .setIsNew(true), - 0, offsetCheckpoints)) assertEquals(Set(brokerId), partition.inSyncReplicaIds) @@ -1268,7 +1252,6 @@ class PartitionTest extends AbstractPartitionTest { val log = logManager.getOrCreateLog(topicPartition, logConfig) seedLogData(log, numRecords = 10, leaderEpoch = 4) - val controllerId = 0 val controllerEpoch = 0 val leaderEpoch = 5 val remoteBrokerId = brokerId + 1 @@ -1278,11 +1261,10 @@ class PartitionTest extends AbstractPartitionTest { doNothing().when(delayedOperations).checkAndCompleteFetch() val initializeTimeMs = time.milliseconds() - partition.createLogIfNotExists(brokerId, isNew = false, isFutureReplica = false, offsetCheckpoints) + partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints) assertTrue( "Expected become leader transition to succeed", partition.makeLeader( - controllerId, new LeaderAndIsrPartitionState() .setControllerEpoch(controllerEpoch) .setLeader(brokerId) @@ -1291,7 +1273,6 @@ class PartitionTest extends AbstractPartitionTest { .setZkVersion(1) .setReplicas(replicas.map(Int.box).asJava) .setIsNew(true), - 0, offsetCheckpoints) ) assertEquals(Set(brokerId, remoteBrokerId), partition.inSyncReplicaIds) @@ -1325,7 +1306,6 @@ class PartitionTest extends AbstractPartitionTest { val log = logManager.getOrCreateLog(topicPartition, logConfig) seedLogData(log, numRecords = 10, leaderEpoch = 4) - val controllerId = 0 val controllerEpoch = 0 val leaderEpoch = 5 val remoteBrokerId = brokerId + 1 @@ -1335,11 +1315,10 @@ class PartitionTest extends AbstractPartitionTest { doNothing().when(delayedOperations).checkAndCompleteFetch() val initializeTimeMs = time.milliseconds() - partition.createLogIfNotExists(brokerId, isNew = false, isFutureReplica = false, offsetCheckpoints) + partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints) assertTrue( "Expected become leader transition to succeed", partition.makeLeader( - controllerId, new LeaderAndIsrPartitionState() .setControllerEpoch(controllerEpoch) .setLeader(brokerId) @@ -1348,7 +1327,6 @@ class PartitionTest extends AbstractPartitionTest { .setZkVersion(1) .setReplicas(replicas.map(Int.box).asJava) .setIsNew(true), - 0, offsetCheckpoints) ) assertEquals(Set(brokerId, remoteBrokerId), partition.inSyncReplicaIds) @@ -1399,7 +1377,6 @@ class PartitionTest extends AbstractPartitionTest { val log = logManager.getOrCreateLog(topicPartition, logConfig) seedLogData(log, numRecords = 10, leaderEpoch = 4) - val controllerId = 0 val controllerEpoch = 0 val leaderEpoch = 5 val remoteBrokerId = brokerId + 1 @@ -1409,11 +1386,10 @@ class PartitionTest extends AbstractPartitionTest { doNothing().when(delayedOperations).checkAndCompleteFetch() val initializeTimeMs = time.milliseconds() - partition.createLogIfNotExists(brokerId, isNew = false, isFutureReplica = false, offsetCheckpoints) + partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints) assertTrue( "Expected become leader transition to succeed", partition.makeLeader( - controllerId, new LeaderAndIsrPartitionState() .setControllerEpoch(controllerEpoch) .setLeader(brokerId) @@ -1422,7 +1398,6 @@ class PartitionTest extends AbstractPartitionTest { .setZkVersion(1) .setReplicas(replicas.map(Int.box).asJava) .setIsNew(true), - 0, offsetCheckpoints) ) assertEquals(Set(brokerId, remoteBrokerId), partition.inSyncReplicaIds) @@ -1458,7 +1433,6 @@ class PartitionTest extends AbstractPartitionTest { val log = logManager.getOrCreateLog(topicPartition, logConfig) seedLogData(log, numRecords = 10, leaderEpoch = 4) - val controllerId = 0 val controllerEpoch = 0 val leaderEpoch = 5 val remoteBrokerId = brokerId + 1 @@ -1468,10 +1442,9 @@ class PartitionTest extends AbstractPartitionTest { doNothing().when(delayedOperations).checkAndCompleteFetch() val initializeTimeMs = time.milliseconds() - partition.createLogIfNotExists(brokerId, isNew = false, isFutureReplica = false, offsetCheckpoints) + partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints) assertTrue("Expected become leader transition to succeed", partition.makeLeader( - controllerId, new LeaderAndIsrPartitionState() .setControllerEpoch(controllerEpoch) .setLeader(brokerId) @@ -1480,7 +1453,6 @@ class PartitionTest extends AbstractPartitionTest { .setZkVersion(1) .setReplicas(replicas) .setIsNew(true), - 0, offsetCheckpoints)) assertEquals(Set(brokerId, remoteBrokerId), partition.inSyncReplicaIds) assertEquals(0L, partition.localLogOrException.highWatermark) @@ -1513,7 +1485,6 @@ class PartitionTest extends AbstractPartitionTest { when(offsetCheckpoints.fetch(logDir1.getAbsolutePath, topicPartition)) .thenReturn(Some(4L)) - val controllerId = 0 val controllerEpoch = 3 val replicas = List[Integer](brokerId, brokerId + 1).asJava val leaderState = new LeaderAndIsrPartitionState() @@ -1524,7 +1495,7 @@ class PartitionTest extends AbstractPartitionTest { .setZkVersion(1) .setReplicas(replicas) .setIsNew(false) - partition.makeLeader(controllerId, leaderState, 0, offsetCheckpoints) + partition.makeLeader(leaderState, offsetCheckpoints) assertEquals(4, partition.localLogOrException.highWatermark) } @@ -1553,7 +1524,6 @@ class PartitionTest extends AbstractPartitionTest { @Test def testUnderReplicatedPartitionsCorrectSemantics(): Unit = { - val controllerId = 0 val controllerEpoch = 3 val replicas = List[Integer](brokerId, brokerId + 1, brokerId + 2).asJava val isr = List[Integer](brokerId, brokerId + 1).asJava @@ -1566,11 +1536,11 @@ class PartitionTest extends AbstractPartitionTest { .setZkVersion(1) .setReplicas(replicas) .setIsNew(false) - partition.makeLeader(controllerId, leaderState, 0, offsetCheckpoints) + partition.makeLeader(leaderState, offsetCheckpoints) assertTrue(partition.isUnderReplicated) leaderState = leaderState.setIsr(replicas) - partition.makeLeader(controllerId, leaderState, 0, offsetCheckpoints) + partition.makeLeader(leaderState, offsetCheckpoints) assertFalse(partition.isUnderReplicated) } @@ -1626,7 +1596,7 @@ class PartitionTest extends AbstractPartitionTest { metadataCache, spyLogManager) - partition.createLog(brokerId, isNew = true, isFutureReplica = false, offsetCheckpoints) + partition.createLog(isNew = true, isFutureReplica = false, offsetCheckpoints) // Validate that initializingLog and finishedInitializingLog was called verify(spyLogManager).initializingLog(ArgumentMatchers.eq(topicPartition)) @@ -1660,7 +1630,7 @@ class PartitionTest extends AbstractPartitionTest { metadataCache, spyLogManager) - partition.createLog(brokerId, isNew = true, isFutureReplica = false, offsetCheckpoints) + partition.createLog(isNew = true, isFutureReplica = false, offsetCheckpoints) // Validate that initializingLog and finishedInitializingLog was called verify(spyLogManager).initializingLog(ArgumentMatchers.eq(topicPartition)) @@ -1695,7 +1665,7 @@ class PartitionTest extends AbstractPartitionTest { metadataCache, spyLogManager) - partition.createLog(brokerId, isNew = true, isFutureReplica = false, offsetCheckpoints) + partition.createLog(isNew = true, isFutureReplica = false, offsetCheckpoints) // Validate that initializingLog and finishedInitializingLog was called verify(spyLogManager).initializingLog(ArgumentMatchers.eq(topicPartition)) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index f5b648733bc27..4b600204bcfb6 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -89,7 +89,7 @@ class ReplicaManagerTest { new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size)) try { val partition = rm.createPartition(new TopicPartition(topic, 1)) - partition.createLogIfNotExists(1, isNew = false, isFutureReplica = false, + partition.createLogIfNotExists(isNew = false, isFutureReplica = false, new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints)) rm.checkpointHighWatermarks() } finally { @@ -109,7 +109,7 @@ class ReplicaManagerTest { new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size)) try { val partition = rm.createPartition(new TopicPartition(topic, 1)) - partition.createLogIfNotExists(1, isNew = false, isFutureReplica = false, + partition.createLogIfNotExists(isNew = false, isFutureReplica = false, new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints)) rm.checkpointHighWatermarks() } finally { @@ -164,7 +164,7 @@ class ReplicaManagerTest { val brokerList = Seq[Integer](0, 1).asJava val partition = rm.createPartition(new TopicPartition(topic, 0)) - partition.createLogIfNotExists(0, isNew = false, isFutureReplica = false, + partition.createLogIfNotExists(isNew = false, isFutureReplica = false, new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints)) // Make this replica the leader. val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, @@ -216,7 +216,7 @@ class ReplicaManagerTest { val brokerList = Seq[Integer](0, 1).asJava val topicPartition = new TopicPartition(topic, 0) replicaManager.createPartition(topicPartition) - .createLogIfNotExists(0, isNew = false, isFutureReplica = false, + .createLogIfNotExists(isNew = false, isFutureReplica = false, new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)) def leaderAndIsrRequest(epoch: Int): LeaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, @@ -270,7 +270,7 @@ class ReplicaManagerTest { val brokerList = Seq[Integer](0, 1).asJava val partition = replicaManager.createPartition(new TopicPartition(topic, 0)) - partition.createLogIfNotExists(0, isNew = false, isFutureReplica = false, + partition.createLogIfNotExists(isNew = false, isFutureReplica = false, new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)) // Make this replica the leader. @@ -330,7 +330,7 @@ class ReplicaManagerTest { val brokerList = Seq[Integer](0, 1).asJava val partition = replicaManager.createPartition(new TopicPartition(topic, 0)) - partition.createLogIfNotExists(0, isNew = false, isFutureReplica = false, + partition.createLogIfNotExists(isNew = false, isFutureReplica = false, new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)) // Make this replica the leader. @@ -436,7 +436,7 @@ class ReplicaManagerTest { try { val brokerList = Seq[Integer](0, 1).asJava val partition = replicaManager.createPartition(new TopicPartition(topic, 0)) - partition.createLogIfNotExists(0, isNew = false, isFutureReplica = false, + partition.createLogIfNotExists(isNew = false, isFutureReplica = false, new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)) // Make this replica the leader. @@ -512,7 +512,7 @@ class ReplicaManagerTest { val brokerList = Seq[Integer](0, 1, 2).asJava val partition = rm.createPartition(new TopicPartition(topic, 0)) - partition.createLogIfNotExists(0, isNew = false, isFutureReplica = false, + partition.createLogIfNotExists(isNew = false, isFutureReplica = false, new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints)) // Make this replica the leader. @@ -668,8 +668,8 @@ class ReplicaManagerTest { val tp0 = new TopicPartition(topic, 0) val tp1 = new TopicPartition(topic, 1) val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints) - replicaManager.createPartition(tp0).createLogIfNotExists(0, isNew = false, isFutureReplica = false, offsetCheckpoints) - replicaManager.createPartition(tp1).createLogIfNotExists(0, isNew = false, isFutureReplica = false, offsetCheckpoints) + replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints) + replicaManager.createPartition(tp1).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints) val partition0Replicas = Seq[Integer](0, 1).asJava val partition1Replicas = Seq[Integer](0, 2).asJava val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, @@ -782,10 +782,10 @@ class ReplicaManagerTest { val tp = new TopicPartition(topic, topicPartition) val partition = replicaManager.createPartition(tp) val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints) - partition.createLogIfNotExists(followerBrokerId, isNew = false, isFutureReplica = false, offsetCheckpoints) - partition.makeFollower(controllerId, + partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints) + partition.makeFollower( leaderAndIsrPartitionState(tp, leaderEpoch, leaderBrokerId, aliveBrokerIds), - correlationId, offsetCheckpoints) + offsetCheckpoints) // Make local partition a follower - because epoch increased by more than 1, truncation should // trigger even though leader does not change @@ -808,7 +808,6 @@ class ReplicaManagerTest { val topicPartition = 0 val followerBrokerId = 0 val leaderBrokerId = 1 - val controllerId = 0 val leaderEpoch = 1 val leaderEpochIncrement = 2 val aliveBrokerIds = Seq[Integer] (followerBrokerId, leaderBrokerId) @@ -823,11 +822,9 @@ class ReplicaManagerTest { val partition = replicaManager.createPartition(tp) val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints) - partition.createLogIfNotExists(leaderBrokerId, isNew = false, isFutureReplica = false, offsetCheckpoints) + partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints) partition.makeLeader( - controllerId, leaderAndIsrPartitionState(tp, leaderEpoch, leaderBrokerId, aliveBrokerIds), - correlationId, offsetCheckpoints ) @@ -977,7 +974,7 @@ class ReplicaManagerTest { val tp0 = new TopicPartition(topic, 0) val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints) - replicaManager.createPartition(tp0).createLogIfNotExists(0, isNew = false, isFutureReplica = false, offsetCheckpoints) + replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints) val partition0Replicas = Seq[Integer](0, 1).asJava val becomeFollowerRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, Seq(new LeaderAndIsrPartitionState() @@ -1016,7 +1013,7 @@ class ReplicaManagerTest { val tp0 = new TopicPartition(topic, 0) val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints) - replicaManager.createPartition(tp0).createLogIfNotExists(0, isNew = false, isFutureReplica = false, offsetCheckpoints) + replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints) val partition0Replicas = Seq[Integer](0, 1).asJava val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, @@ -1064,7 +1061,7 @@ class ReplicaManagerTest { val tp0 = new TopicPartition(topic, 0) val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints) - replicaManager.createPartition(tp0).createLogIfNotExists(0, isNew = false, isFutureReplica = false, offsetCheckpoints) + replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints) val partition0Replicas = Seq[Integer](0, 1).asJava val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, @@ -1112,7 +1109,7 @@ class ReplicaManagerTest { val tp0 = new TopicPartition(topic, 0) val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints) - replicaManager.createPartition(tp0).createLogIfNotExists(0, isNew = false, isFutureReplica = false, offsetCheckpoints) + replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints) val partition0Replicas = Seq[Integer](0, 1).asJava val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, @@ -1154,7 +1151,7 @@ class ReplicaManagerTest { val tp0 = new TopicPartition(topic, 0) val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints) - replicaManager.createPartition(tp0).createLogIfNotExists(0, isNew = false, isFutureReplica = false, offsetCheckpoints) + replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints) val partition0Replicas = Seq[Integer](0, 1).asJava val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, @@ -1193,7 +1190,7 @@ class ReplicaManagerTest { val tp0 = new TopicPartition(topic, 0) val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints) - replicaManager.createPartition(tp0).createLogIfNotExists(0, isNew = false, isFutureReplica = false, offsetCheckpoints) + replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints) val partition0Replicas = Seq[Integer](0, 1).asJava val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java index a03c9bf4b3083..017926cd13945 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java @@ -159,7 +159,7 @@ public void setup() throws IOException { 0, Time.SYSTEM, partitionStateStore, new DelayedOperationsMock(tp), Mockito.mock(MetadataCache.class), logManager); - partition.makeFollower(0, partitionState, 0, offsetCheckpoints); + partition.makeFollower(partitionState, offsetCheckpoints); pool.put(tp, partition); offsetAndEpochs.put(tp, new OffsetAndEpoch(0, 0)); BaseRecords fetched = new BaseRecords() { diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java index 2919eba3c14fd..d78f3e6d5112d 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java @@ -121,7 +121,7 @@ public void setup() throws IOException { ApiVersion$.MODULE$.latestVersion(), 0, Time.SYSTEM, partitionStateStore, delayedOperations, Mockito.mock(MetadataCache.class), logManager); - partition.createLogIfNotExists(0, true, false, offsetCheckpoints); + partition.createLogIfNotExists(true, false, offsetCheckpoints); executorService.submit((Runnable) () -> { SimpleRecord[] simpleRecords = new SimpleRecord[] { new SimpleRecord(1L, "foo".getBytes(StandardCharsets.UTF_8), "1".getBytes(StandardCharsets.UTF_8)), @@ -154,7 +154,7 @@ public boolean testMakeFollower() { .setZkVersion(1) .setReplicas(replicas) .setIsNew(true); - return partition.makeFollower(0, partitionState, 0, offsetCheckpoints); + return partition.makeFollower(partitionState, offsetCheckpoints); } private static LogConfig createLogConfig() { diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java index c3e074685b561..a686c3bcdc4d0 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java @@ -119,7 +119,7 @@ public void setUp() { ApiVersion$.MODULE$.latestVersion(), 0, Time.SYSTEM, partitionStateStore, delayedOperations, Mockito.mock(MetadataCache.class), logManager); - partition.makeLeader(0, partitionState, 0, offsetCheckpoints); + partition.makeLeader(partitionState, offsetCheckpoints); } // avoid mocked DelayedOperations to avoid mocked class affecting benchmark results diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/HighwatermarkCheckpointBench.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/HighwatermarkCheckpointBench.java index c6cfc46ac381c..9903925000264 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/HighwatermarkCheckpointBench.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/HighwatermarkCheckpointBench.java @@ -16,10 +16,11 @@ */ package org.apache.kafka.jmh.server; +import java.util.HashMap; +import java.util.Properties; import kafka.cluster.Partition; -import kafka.cluster.Replica; +import kafka.cluster.PartitionStateStore; import kafka.log.CleanerConfig; -import kafka.log.Log; import kafka.log.LogConfig; import kafka.log.LogManager; import kafka.server.BrokerTopicStats; @@ -28,13 +29,19 @@ import kafka.server.MetadataCache; import kafka.server.QuotaFactory; import kafka.server.ReplicaManager; +import kafka.server.checkpoints.LazyOffsetCheckpoints; +import kafka.server.checkpoints.OffsetCheckpoints; import kafka.utils.KafkaScheduler; import kafka.utils.MockTime; import kafka.utils.Scheduler; import kafka.utils.TestUtils; +import kafka.zk.AdminZkClient; +import kafka.zk.KafkaZkClient; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; +import org.mockito.Mockito; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.Fork; import org.openjdk.jmh.annotations.Level; @@ -48,7 +55,6 @@ import org.openjdk.jmh.annotations.Threads; import org.openjdk.jmh.annotations.Warmup; import scala.Option; -import scala.collection.JavaConversions; import java.io.File; import java.util.ArrayList; @@ -56,6 +62,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; +import scala.collection.JavaConverters; @Warmup(iterations = 5) @@ -64,8 +71,10 @@ @OutputTimeUnit(TimeUnit.MILLISECONDS) @State(Scope.Benchmark) public class HighwatermarkCheckpointBench { + @Param({"100", "1000", "2000"}) public int numTopics; + @Param({"3"}) public int numPartitions; @@ -90,13 +99,13 @@ public void setup() { this.scheduler = new KafkaScheduler(1, "scheduler-thread", true); this.brokerProperties = KafkaConfig.fromProps(TestUtils.createBrokerConfig( 0, TestUtils.MockZkConnect(), true, true, 9092, Option.empty(), Option.empty(), - Option.empty(), true, false, 0, false, 0, false, 0, Option.empty(), 1, true)); + Option.empty(), true, false, 0, false, 0, false, 0, Option.empty(), 1, true, 1, (short) 1)); this.metrics = new Metrics(); this.time = new MockTime(); this.failureChannel = new LogDirFailureChannel(brokerProperties.logDirs().size()); final List files = - JavaConversions.seqAsJavaList(brokerProperties.logDirs()).stream().map(File::new).collect(Collectors.toList()); - this.logManager = TestUtils.createLogManager(JavaConversions.asScalaBuffer(files), + JavaConverters.seqAsJavaList(brokerProperties.logDirs()).stream().map(File::new).collect(Collectors.toList()); + this.logManager = TestUtils.createLogManager(JavaConverters.asScalaBuffer(files), LogConfig.apply(), CleanerConfig.apply(1, 4 * 1024 * 1024L, 0.9d, 1024 * 1024, 32 * 1024 * 1024, Double.MAX_VALUE, 15 * 1000, true, "MD5"), time); @@ -108,11 +117,17 @@ public void setup() { QuotaFactory.instantiate(this.brokerProperties, this.metrics, this.time, ""); + KafkaZkClient zkClient = new KafkaZkClient(null, false, Time.SYSTEM) { + @Override + public Properties getEntityConfigs(String rootEntityType, String sanitizedEntityName) { + return new Properties(); + } + }; this.replicaManager = new ReplicaManager( this.brokerProperties, this.metrics, this.time, - null, + zkClient, this.scheduler, this.logManager, new AtomicBoolean(false), @@ -121,7 +136,7 @@ public void setup() { metadataCache, this.failureChannel, Option.empty()); - this.replicaManager.startup(); + replicaManager.startup(); List topicPartitions = new ArrayList<>(); for (int topicNum = 0; topicNum < numTopics; topicNum++) { @@ -130,16 +145,16 @@ public void setup() { topicPartitions.add(new TopicPartition(topicName, partitionNum)); } } - this.replicaManager.checkpointHighWatermarks(); + + PartitionStateStore partitionStateStore = Mockito.mock(PartitionStateStore.class); + Mockito.when(partitionStateStore.fetchTopicConfig()).thenReturn(new Properties()); + OffsetCheckpoints checkpoints = (logDir, topicPartition) -> Option.apply(0L); for (TopicPartition topicPartition : topicPartitions) { - final Partition partition = - this.replicaManager.getOrCreatePartition(topicPartition); - final Log log = this.logManager.getOrCreateLog(topicPartition, - LogConfig.apply(), true, false); - final Replica replica = new Replica(this.brokerProperties.brokerId(), - topicPartition, this.time, 0, Option.apply(log)); - partition.addReplicaIfNotExists(replica); + final Partition partition = this.replicaManager.createPartition(topicPartition); + partition.createLogIfNotExists(true, false, checkpoints); } + + replicaManager.checkpointHighWatermarks(); } @TearDown(Level.Trial) @@ -148,7 +163,7 @@ public void tearDown() throws Exception { this.metrics.close(); this.scheduler.shutdown(); this.quotaManagers.shutdown(); - for (File dir : JavaConversions.asJavaCollection(logManager.liveLogDirs())) { + for (File dir : JavaConverters.asJavaCollection(logManager.liveLogDirs())) { Utils.delete(dir); } } @@ -159,4 +174,4 @@ public void tearDown() throws Exception { public void measureCheckpointHighWatermarks() { this.replicaManager.checkpointHighWatermarks(); } -} \ No newline at end of file +} From 064814576d8748f145ada7845850fc310b13fad5 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Tue, 24 Mar 2020 12:00:32 -0700 Subject: [PATCH 07/10] Remove slf4j-nop --- build.gradle | 1 - 1 file changed, 1 deletion(-) diff --git a/build.gradle b/build.gradle index 338a8ba4b36fb..6144095b6061b 100644 --- a/build.gradle +++ b/build.gradle @@ -1547,7 +1547,6 @@ project(':jmh-benchmarks') { compile libs.mockitoCore annotationProcessor libs.jmhGeneratorAnnProcess compile libs.jmhCoreBenchmarks - compile group: 'org.slf4j', name: 'slf4j-nop', version: '1.7.26' } jar { From e8ba5f28bb4c5a237c3706d7fe413c98edbc88b5 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Tue, 24 Mar 2020 12:06:05 -0700 Subject: [PATCH 08/10] Fix checkstyle --- checkstyle/import-control-core.xml | 6 ------ checkstyle/import-control-jmh-benchmarks.xml | 1 + checkstyle/import-control.xml | 16 ---------------- .../jmh/server/HighwatermarkCheckpointBench.java | 3 --- 4 files changed, 1 insertion(+), 25 deletions(-) diff --git a/checkstyle/import-control-core.xml b/checkstyle/import-control-core.xml index 5bd9f96f68384..6e5042fd35d8a 100644 --- a/checkstyle/import-control-core.xml +++ b/checkstyle/import-control-core.xml @@ -58,10 +58,4 @@ - - - - - - diff --git a/checkstyle/import-control-jmh-benchmarks.xml b/checkstyle/import-control-jmh-benchmarks.xml index 3536ccbaf5697..4b546cb6835d0 100644 --- a/checkstyle/import-control-jmh-benchmarks.xml +++ b/checkstyle/import-control-jmh-benchmarks.xml @@ -39,6 +39,7 @@ + diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index b92d92ccce129..3e41e9778f580 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -284,22 +284,6 @@ - - - - - - - - - - - - - - - - diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/HighwatermarkCheckpointBench.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/HighwatermarkCheckpointBench.java index 9903925000264..52316b7c1ef1e 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/HighwatermarkCheckpointBench.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/HighwatermarkCheckpointBench.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.jmh.server; -import java.util.HashMap; import java.util.Properties; import kafka.cluster.Partition; import kafka.cluster.PartitionStateStore; @@ -29,13 +28,11 @@ import kafka.server.MetadataCache; import kafka.server.QuotaFactory; import kafka.server.ReplicaManager; -import kafka.server.checkpoints.LazyOffsetCheckpoints; import kafka.server.checkpoints.OffsetCheckpoints; import kafka.utils.KafkaScheduler; import kafka.utils.MockTime; import kafka.utils.Scheduler; import kafka.utils.TestUtils; -import kafka.zk.AdminZkClient; import kafka.zk.KafkaZkClient; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Metrics; From c2c6f3ca358be4d8ea942c329d567ec1eabc39e4 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Tue, 24 Mar 2020 19:34:20 -0700 Subject: [PATCH 09/10] Use CollectionConverters instead of JavaConverters --- .../kafka/jmh/server/HighwatermarkCheckpointBench.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/HighwatermarkCheckpointBench.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/HighwatermarkCheckpointBench.java index 52316b7c1ef1e..9cb8ac6a97a9e 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/HighwatermarkCheckpointBench.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/HighwatermarkCheckpointBench.java @@ -59,7 +59,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; -import scala.collection.JavaConverters; +import scala.jdk.CollectionConverters; @Warmup(iterations = 5) @@ -101,8 +101,8 @@ public void setup() { this.time = new MockTime(); this.failureChannel = new LogDirFailureChannel(brokerProperties.logDirs().size()); final List files = - JavaConverters.seqAsJavaList(brokerProperties.logDirs()).stream().map(File::new).collect(Collectors.toList()); - this.logManager = TestUtils.createLogManager(JavaConverters.asScalaBuffer(files), + CollectionConverters.seqAsJavaList(brokerProperties.logDirs()).stream().map(File::new).collect(Collectors.toList()); + this.logManager = TestUtils.createLogManager(CollectionConverters.asScalaBuffer(files), LogConfig.apply(), CleanerConfig.apply(1, 4 * 1024 * 1024L, 0.9d, 1024 * 1024, 32 * 1024 * 1024, Double.MAX_VALUE, 15 * 1000, true, "MD5"), time); @@ -160,7 +160,7 @@ public void tearDown() throws Exception { this.metrics.close(); this.scheduler.shutdown(); this.quotaManagers.shutdown(); - for (File dir : JavaConverters.asJavaCollection(logManager.liveLogDirs())) { + for (File dir : CollectionConverters.asJavaCollection(logManager.liveLogDirs())) { Utils.delete(dir); } } From 026c910037cefb470a78349d6ad8ac4b734f4172 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Tue, 24 Mar 2020 19:35:58 -0700 Subject: [PATCH 10/10] Add slf4jlog4j to jmh benchmarks --- build.gradle | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index 6144095b6061b..092f4ff1d3385 100644 --- a/build.gradle +++ b/build.gradle @@ -1544,9 +1544,10 @@ project(':jmh-benchmarks') { compile project(':clients').sourceSets.test.output compile project(':core').sourceSets.test.output compile libs.jmhCore - compile libs.mockitoCore annotationProcessor libs.jmhGeneratorAnnProcess compile libs.jmhCoreBenchmarks + compile libs.mockitoCore + compile libs.slf4jlog4j } jar {