From 1df8b2dac2e441330442205a1ef3eef000c5393c Mon Sep 17 00:00:00 2001 From: Divij Vaidya Date: Fri, 21 Apr 2023 14:41:54 +0200 Subject: [PATCH 1/5] Remove metrics on Log Cleaner shutdown --- .../src/main/scala/kafka/log/LogCleaner.scala | 16 ++++++++-- .../scala/unit/kafka/log/LogCleanerTest.scala | 30 +++++++++++++++++++ 2 files changed, 44 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 25511976e31fa..0aa7e9dbdc880 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -167,8 +167,20 @@ class LogCleaner(initialConfig: CleanerConfig, */ def shutdown(): Unit = { info("Shutting down the log cleaner.") - cleaners.foreach(_.shutdown()) - cleaners.clear() + try { + cleaners.foreach(_.shutdown()) + cleaners.clear() + } finally { + remoteMetrics() + } + } + + def remoteMetrics(): Unit = { + metricsGroup.removeMetric("max-buffer-utilization-percent") + metricsGroup.removeMetric("cleaner-recopy-percent") + metricsGroup.removeMetric("max-clean-time-secs") + metricsGroup.removeMetric("max-compaction-delay-secs") + metricsGroup.removeMetric("DeadThreadCount") } override def reconfigurableConfigs: Set[String] = { diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala index fdfa61c10e80f..6912bf22457a7 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala @@ -25,9 +25,12 @@ import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.errors.CorruptRecordException import org.apache.kafka.common.record._ import org.apache.kafka.common.utils.Utils +import org.apache.kafka.server.metrics.KafkaMetricsGroup import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, CleanerConfig, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogStartOffsetIncrementReason, OffsetMap, ProducerStateManager, ProducerStateManagerConfig} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, Test} +import org.mockito.ArgumentMatchers.{any, anyString} +import org.mockito.Mockito.{mockConstruction, times, verify, verifyNoMoreInteractions} import java.io.{File, RandomAccessFile} import java.nio._ @@ -62,6 +65,33 @@ class LogCleanerTest { Utils.delete(tmpdir) } + @Test + def testRemoveMetricsOnClose(): Unit = { + val mockMetricsGroupCtor = mockConstruction(classOf[KafkaMetricsGroup]) + try { + val logCleaner = new LogCleaner(new CleanerConfig(true), + logDirs = Array(TestUtils.tempDir()), + logs = new Pool[TopicPartition, UnifiedLog](), + logDirFailureChannel = new LogDirFailureChannel(1), + time = time) + + // shutdown logCleaner so that metrics are removed + logCleaner.shutdown() + + val mockMetricsGroup = mockMetricsGroupCtor.constructed.get(0) + val numMetricsRegistered = 5 + verify(mockMetricsGroup, times(numMetricsRegistered)).newGauge(anyString(), any()) + verify(mockMetricsGroup, times(numMetricsRegistered)).removeMetric(anyString()) + + // assert that we have verified all invocations on + verifyNoMoreInteractions(mockMetricsGroup) + } finally { + if (mockMetricsGroupCtor != null) { + mockMetricsGroupCtor.close() + } + } + } + /** * Test simple log cleaning */ From c705c0767af5c159b6c17c58cd4be1f8ae2a32fb Mon Sep 17 00:00:00 2001 From: Divij Vaidya Date: Tue, 2 May 2023 15:08:36 +0200 Subject: [PATCH 2/5] Add metric names to const --- .../src/main/scala/kafka/log/LogCleaner.scala | 32 +++++++++++-------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 0aa7e9dbdc880..3ba0a03b1e8fd 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -124,29 +124,37 @@ class LogCleaner(initialConfig: CleanerConfig, private def maxOverCleanerThreads(f: CleanerThread => Double): Int = cleaners.foldLeft(0.0d)((max: Double, thread: CleanerThread) => math.max(max, f(thread))).toInt + private val maxBufferUtilizationPercentMetricName = "max-buffer-utilization-percent" + private val cleanerRecopyPercentMetricName = "cleaner-recopy-percent" + private val maxCleanTimeMetricName = "max-clean-time-secs" + private val maxCompactionDelayMetricsName = "max-compaction-delay-secs" + private val deadThreadCountMetricName = "DeadThreadCount" + private val metricNames = Set.apply(maxBufferUtilizationPercentMetricName, + cleanerRecopyPercentMetricName, + maxCleanTimeMetricName, + maxCompactionDelayMetricsName, + deadThreadCountMetricName) /* a metric to track the maximum utilization of any thread's buffer in the last cleaning */ - metricsGroup.newGauge("max-buffer-utilization-percent", + metricsGroup.newGauge(maxBufferUtilizationPercentMetricName, () => maxOverCleanerThreads(_.lastStats.bufferUtilization) * 100) /* a metric to track the recopy rate of each thread's last cleaning */ - metricsGroup.newGauge("cleaner-recopy-percent", () => { + metricsGroup.newGauge(cleanerRecopyPercentMetricName, () => { val stats = cleaners.map(_.lastStats) val recopyRate = stats.iterator.map(_.bytesWritten).sum.toDouble / math.max(stats.iterator.map(_.bytesRead).sum, 1) (100 * recopyRate).toInt }) /* a metric to track the maximum cleaning time for the last cleaning from each thread */ - metricsGroup.newGauge("max-clean-time-secs", - () => maxOverCleanerThreads(_.lastStats.elapsedSecs)) - + metricsGroup.newGauge(maxCleanTimeMetricName, () => maxOverCleanerThreads(_.lastStats.elapsedSecs)) // a metric to track delay between the time when a log is required to be compacted // as determined by max compaction lag and the time of last cleaner run. - metricsGroup.newGauge("max-compaction-delay-secs", + metricsGroup.newGauge(maxCompactionDelayMetricsName, () => maxOverCleanerThreads(_.lastPreCleanStats.maxCompactionDelayMs.toDouble) / 1000) - metricsGroup.newGauge("DeadThreadCount", () => deadThreadCount) + metricsGroup.newGauge(deadThreadCountMetricName, () => deadThreadCount) private[log] def deadThreadCount: Int = cleaners.count(_.isThreadFailed) @@ -171,16 +179,12 @@ class LogCleaner(initialConfig: CleanerConfig, cleaners.foreach(_.shutdown()) cleaners.clear() } finally { - remoteMetrics() + removeMetrics() } } - def remoteMetrics(): Unit = { - metricsGroup.removeMetric("max-buffer-utilization-percent") - metricsGroup.removeMetric("cleaner-recopy-percent") - metricsGroup.removeMetric("max-clean-time-secs") - metricsGroup.removeMetric("max-compaction-delay-secs") - metricsGroup.removeMetric("DeadThreadCount") + def removeMetrics(): Unit = { + metricNames.foreach(metricsGroup.removeMetric); } override def reconfigurableConfigs: Set[String] = { From 222a5118690386c16f188ccf4a0f4d66b344639a Mon Sep 17 00:00:00 2001 From: Divij Vaidya Date: Tue, 2 May 2023 17:30:40 +0200 Subject: [PATCH 3/5] Use companion object --- .../src/main/scala/kafka/log/LogCleaner.scala | 39 ++++++++++--------- .../scala/unit/kafka/log/LogCleanerTest.scala | 8 +++- 2 files changed, 27 insertions(+), 20 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 3ba0a03b1e8fd..abaee65fd921b 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -22,6 +22,7 @@ import java.nio._ import java.util.Date import java.util.concurrent.TimeUnit import kafka.common._ +import kafka.log.LogCleaner.{CleanerRecopyPercentMetricName, DeadThreadCountMetricName, MaxBufferUtilizationPercentMetricName, MaxCleanTimeMetricName, MaxCompactionDelayMetricsName} import kafka.server.{BrokerReconfigurable, KafkaConfig} import kafka.utils._ import org.apache.kafka.common.{KafkaException, TopicPartition} @@ -124,37 +125,26 @@ class LogCleaner(initialConfig: CleanerConfig, private def maxOverCleanerThreads(f: CleanerThread => Double): Int = cleaners.foldLeft(0.0d)((max: Double, thread: CleanerThread) => math.max(max, f(thread))).toInt - private val maxBufferUtilizationPercentMetricName = "max-buffer-utilization-percent" - private val cleanerRecopyPercentMetricName = "cleaner-recopy-percent" - private val maxCleanTimeMetricName = "max-clean-time-secs" - private val maxCompactionDelayMetricsName = "max-compaction-delay-secs" - private val deadThreadCountMetricName = "DeadThreadCount" - private val metricNames = Set.apply(maxBufferUtilizationPercentMetricName, - cleanerRecopyPercentMetricName, - maxCleanTimeMetricName, - maxCompactionDelayMetricsName, - deadThreadCountMetricName) - /* a metric to track the maximum utilization of any thread's buffer in the last cleaning */ - metricsGroup.newGauge(maxBufferUtilizationPercentMetricName, + metricsGroup.newGauge(MaxBufferUtilizationPercentMetricName, () => maxOverCleanerThreads(_.lastStats.bufferUtilization) * 100) /* a metric to track the recopy rate of each thread's last cleaning */ - metricsGroup.newGauge(cleanerRecopyPercentMetricName, () => { + metricsGroup.newGauge(CleanerRecopyPercentMetricName, () => { val stats = cleaners.map(_.lastStats) val recopyRate = stats.iterator.map(_.bytesWritten).sum.toDouble / math.max(stats.iterator.map(_.bytesRead).sum, 1) (100 * recopyRate).toInt }) /* a metric to track the maximum cleaning time for the last cleaning from each thread */ - metricsGroup.newGauge(maxCleanTimeMetricName, () => maxOverCleanerThreads(_.lastStats.elapsedSecs)) + metricsGroup.newGauge(MaxCleanTimeMetricName, () => maxOverCleanerThreads(_.lastStats.elapsedSecs)) // a metric to track delay between the time when a log is required to be compacted // as determined by max compaction lag and the time of last cleaner run. - metricsGroup.newGauge(maxCompactionDelayMetricsName, + metricsGroup.newGauge(MaxCompactionDelayMetricsName, () => maxOverCleanerThreads(_.lastPreCleanStats.maxCompactionDelayMs.toDouble) / 1000) - metricsGroup.newGauge(deadThreadCountMetricName, () => deadThreadCount) + metricsGroup.newGauge(DeadThreadCountMetricName, () => deadThreadCount) private[log] def deadThreadCount: Int = cleaners.count(_.isThreadFailed) @@ -184,7 +174,7 @@ class LogCleaner(initialConfig: CleanerConfig, } def removeMetrics(): Unit = { - metricNames.foreach(metricsGroup.removeMetric); + LogCleaner.MetricNames.foreach(metricsGroup.removeMetric) } override def reconfigurableConfigs: Set[String] = { @@ -205,14 +195,14 @@ class LogCleaner(initialConfig: CleanerConfig, /** * Reconfigure log clean config. The will: - * 1. update desiredRatePerSec in Throttler with logCleanerIoMaxBytesPerSecond, if necessary + * 1. update desiredRatePerSec in Throttler with logCleanerIoMaxBytesPerSecond, if necessary * 2. stop current log cleaners and create new ones. * That ensures that if any of the cleaners had failed, new cleaners are created to match the new config. */ override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = { config = LogCleaner.cleanerConfig(newConfig) - val maxIoBytesPerSecond = config.maxIoBytesPerSecond; + val maxIoBytesPerSecond = config.maxIoBytesPerSecond if (maxIoBytesPerSecond != oldConfig.logCleanerIoMaxBytesPerSecond) { info(s"Updating logCleanerIoMaxBytesPerSecond: $maxIoBytesPerSecond") throttler.updateDesiredRatePerSec(maxIoBytesPerSecond) @@ -482,6 +472,17 @@ object LogCleaner { config.logCleanerEnable) } + + private val MaxBufferUtilizationPercentMetricName = "max-buffer-utilization-percent" + private val CleanerRecopyPercentMetricName = "cleaner-recopy-percent" + private val MaxCleanTimeMetricName = "max-clean-time-secs" + private val MaxCompactionDelayMetricsName = "max-compaction-delay-secs" + private val DeadThreadCountMetricName = "DeadThreadCount" + private[log] val MetricNames = Set(MaxBufferUtilizationPercentMetricName, + CleanerRecopyPercentMetricName, + MaxCleanTimeMetricName, + MaxCompactionDelayMetricsName, + DeadThreadCountMetricName) } /** diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala index 6912bf22457a7..145dec1134c27 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala @@ -81,7 +81,13 @@ class LogCleanerTest { val mockMetricsGroup = mockMetricsGroupCtor.constructed.get(0) val numMetricsRegistered = 5 verify(mockMetricsGroup, times(numMetricsRegistered)).newGauge(anyString(), any()) - verify(mockMetricsGroup, times(numMetricsRegistered)).removeMetric(anyString()) + + // verify that all metrics are added to the list of metric name + assertEquals(LogCleaner.MetricNames.size, numMetricsRegistered, + "All metrics are not part of MetricNames collections") + + // verify that each metric is removed + LogCleaner.MetricNames.foreach(verify(mockMetricsGroup).removeMetric(_)) // assert that we have verified all invocations on verifyNoMoreInteractions(mockMetricsGroup) From 250d4ceed9e48916d72c9795c3c30f487751eb93 Mon Sep 17 00:00:00 2001 From: Divij Vaidya Date: Wed, 3 May 2023 14:59:37 +0200 Subject: [PATCH 4/5] Address cr comments --- core/src/main/scala/kafka/log/LogCleaner.scala | 4 +++- core/src/test/scala/unit/kafka/log/LogCleanerTest.scala | 6 ++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index abaee65fd921b..b8b991aeabed4 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -478,7 +478,9 @@ object LogCleaner { private val MaxCleanTimeMetricName = "max-clean-time-secs" private val MaxCompactionDelayMetricsName = "max-compaction-delay-secs" private val DeadThreadCountMetricName = "DeadThreadCount" - private[log] val MetricNames = Set(MaxBufferUtilizationPercentMetricName, + // package private for testing + private[log] val MetricNames = Set( + MaxBufferUtilizationPercentMetricName, CleanerRecopyPercentMetricName, MaxCleanTimeMetricName, MaxCompactionDelayMetricsName, diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala index 145dec1134c27..20f993ec6523e 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala @@ -83,7 +83,7 @@ class LogCleanerTest { verify(mockMetricsGroup, times(numMetricsRegistered)).newGauge(anyString(), any()) // verify that all metrics are added to the list of metric name - assertEquals(LogCleaner.MetricNames.size, numMetricsRegistered, + assertEquals(numMetricsRegistered, LogCleaner.MetricNames.size, "All metrics are not part of MetricNames collections") // verify that each metric is removed @@ -92,9 +92,7 @@ class LogCleanerTest { // assert that we have verified all invocations on verifyNoMoreInteractions(mockMetricsGroup) } finally { - if (mockMetricsGroupCtor != null) { - mockMetricsGroupCtor.close() - } + mockMetricsGroupCtor.close() } } From a9c97d9878a69a9cc758a5d891c16ea702b7f519 Mon Sep 17 00:00:00 2001 From: Divij Vaidya Date: Thu, 4 May 2023 11:41:15 +0200 Subject: [PATCH 5/5] Address PR comment --- core/src/test/scala/unit/kafka/log/LogCleanerTest.scala | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala index 20f993ec6523e..6a1f0f1b911d9 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala @@ -79,13 +79,9 @@ class LogCleanerTest { logCleaner.shutdown() val mockMetricsGroup = mockMetricsGroupCtor.constructed.get(0) - val numMetricsRegistered = 5 + val numMetricsRegistered = LogCleaner.MetricNames.size verify(mockMetricsGroup, times(numMetricsRegistered)).newGauge(anyString(), any()) - - // verify that all metrics are added to the list of metric name - assertEquals(numMetricsRegistered, LogCleaner.MetricNames.size, - "All metrics are not part of MetricNames collections") - + // verify that each metric is removed LogCleaner.MetricNames.foreach(verify(mockMetricsGroup).removeMetric(_))