diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 25511976e31fa..b8b991aeabed4 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -22,6 +22,7 @@ import java.nio._ import java.util.Date import java.util.concurrent.TimeUnit import kafka.common._ +import kafka.log.LogCleaner.{CleanerRecopyPercentMetricName, DeadThreadCountMetricName, MaxBufferUtilizationPercentMetricName, MaxCleanTimeMetricName, MaxCompactionDelayMetricsName} import kafka.server.{BrokerReconfigurable, KafkaConfig} import kafka.utils._ import org.apache.kafka.common.{KafkaException, TopicPartition} @@ -124,29 +125,26 @@ class LogCleaner(initialConfig: CleanerConfig, private def maxOverCleanerThreads(f: CleanerThread => Double): Int = cleaners.foldLeft(0.0d)((max: Double, thread: CleanerThread) => math.max(max, f(thread))).toInt - /* a metric to track the maximum utilization of any thread's buffer in the last cleaning */ - metricsGroup.newGauge("max-buffer-utilization-percent", + metricsGroup.newGauge(MaxBufferUtilizationPercentMetricName, () => maxOverCleanerThreads(_.lastStats.bufferUtilization) * 100) /* a metric to track the recopy rate of each thread's last cleaning */ - metricsGroup.newGauge("cleaner-recopy-percent", () => { + metricsGroup.newGauge(CleanerRecopyPercentMetricName, () => { val stats = cleaners.map(_.lastStats) val recopyRate = stats.iterator.map(_.bytesWritten).sum.toDouble / math.max(stats.iterator.map(_.bytesRead).sum, 1) (100 * recopyRate).toInt }) /* a metric to track the maximum cleaning time for the last cleaning from each thread */ - metricsGroup.newGauge("max-clean-time-secs", - () => maxOverCleanerThreads(_.lastStats.elapsedSecs)) - + metricsGroup.newGauge(MaxCleanTimeMetricName, () => maxOverCleanerThreads(_.lastStats.elapsedSecs)) // a metric to track delay between the time when a log is required to be compacted // as determined by max compaction lag and the time of last cleaner run. - metricsGroup.newGauge("max-compaction-delay-secs", + metricsGroup.newGauge(MaxCompactionDelayMetricsName, () => maxOverCleanerThreads(_.lastPreCleanStats.maxCompactionDelayMs.toDouble) / 1000) - metricsGroup.newGauge("DeadThreadCount", () => deadThreadCount) + metricsGroup.newGauge(DeadThreadCountMetricName, () => deadThreadCount) private[log] def deadThreadCount: Int = cleaners.count(_.isThreadFailed) @@ -167,8 +165,16 @@ class LogCleaner(initialConfig: CleanerConfig, */ def shutdown(): Unit = { info("Shutting down the log cleaner.") - cleaners.foreach(_.shutdown()) - cleaners.clear() + try { + cleaners.foreach(_.shutdown()) + cleaners.clear() + } finally { + removeMetrics() + } + } + + def removeMetrics(): Unit = { + LogCleaner.MetricNames.foreach(metricsGroup.removeMetric) } override def reconfigurableConfigs: Set[String] = { @@ -189,14 +195,14 @@ class LogCleaner(initialConfig: CleanerConfig, /** * Reconfigure log clean config. The will: - * 1. update desiredRatePerSec in Throttler with logCleanerIoMaxBytesPerSecond, if necessary + * 1. update desiredRatePerSec in Throttler with logCleanerIoMaxBytesPerSecond, if necessary * 2. stop current log cleaners and create new ones. * That ensures that if any of the cleaners had failed, new cleaners are created to match the new config. */ override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = { config = LogCleaner.cleanerConfig(newConfig) - val maxIoBytesPerSecond = config.maxIoBytesPerSecond; + val maxIoBytesPerSecond = config.maxIoBytesPerSecond if (maxIoBytesPerSecond != oldConfig.logCleanerIoMaxBytesPerSecond) { info(s"Updating logCleanerIoMaxBytesPerSecond: $maxIoBytesPerSecond") throttler.updateDesiredRatePerSec(maxIoBytesPerSecond) @@ -466,6 +472,19 @@ object LogCleaner { config.logCleanerEnable) } + + private val MaxBufferUtilizationPercentMetricName = "max-buffer-utilization-percent" + private val CleanerRecopyPercentMetricName = "cleaner-recopy-percent" + private val MaxCleanTimeMetricName = "max-clean-time-secs" + private val MaxCompactionDelayMetricsName = "max-compaction-delay-secs" + private val DeadThreadCountMetricName = "DeadThreadCount" + // package private for testing + private[log] val MetricNames = Set( + MaxBufferUtilizationPercentMetricName, + CleanerRecopyPercentMetricName, + MaxCleanTimeMetricName, + MaxCompactionDelayMetricsName, + DeadThreadCountMetricName) } /** diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala index fdfa61c10e80f..6a1f0f1b911d9 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala @@ -25,9 +25,12 @@ import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.errors.CorruptRecordException import org.apache.kafka.common.record._ import org.apache.kafka.common.utils.Utils +import org.apache.kafka.server.metrics.KafkaMetricsGroup import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, CleanerConfig, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogStartOffsetIncrementReason, OffsetMap, ProducerStateManager, ProducerStateManagerConfig} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, Test} +import org.mockito.ArgumentMatchers.{any, anyString} +import org.mockito.Mockito.{mockConstruction, times, verify, verifyNoMoreInteractions} import java.io.{File, RandomAccessFile} import java.nio._ @@ -62,6 +65,33 @@ class LogCleanerTest { Utils.delete(tmpdir) } + @Test + def testRemoveMetricsOnClose(): Unit = { + val mockMetricsGroupCtor = mockConstruction(classOf[KafkaMetricsGroup]) + try { + val logCleaner = new LogCleaner(new CleanerConfig(true), + logDirs = Array(TestUtils.tempDir()), + logs = new Pool[TopicPartition, UnifiedLog](), + logDirFailureChannel = new LogDirFailureChannel(1), + time = time) + + // shutdown logCleaner so that metrics are removed + logCleaner.shutdown() + + val mockMetricsGroup = mockMetricsGroupCtor.constructed.get(0) + val numMetricsRegistered = LogCleaner.MetricNames.size + verify(mockMetricsGroup, times(numMetricsRegistered)).newGauge(anyString(), any()) + + // verify that each metric is removed + LogCleaner.MetricNames.foreach(verify(mockMetricsGroup).removeMetric(_)) + + // assert that we have verified all invocations on + verifyNoMoreInteractions(mockMetricsGroup) + } finally { + mockMetricsGroupCtor.close() + } + } + /** * Test simple log cleaning */