Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 31 additions & 12 deletions core/src/main/scala/kafka/log/LogCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.nio._
import java.util.Date
import java.util.concurrent.TimeUnit
import kafka.common._
import kafka.log.LogCleaner.{CleanerRecopyPercentMetricName, DeadThreadCountMetricName, MaxBufferUtilizationPercentMetricName, MaxCleanTimeMetricName, MaxCompactionDelayMetricsName}
import kafka.server.{BrokerReconfigurable, KafkaConfig}
import kafka.utils._
import org.apache.kafka.common.{KafkaException, TopicPartition}
Expand Down Expand Up @@ -124,29 +125,26 @@ class LogCleaner(initialConfig: CleanerConfig,
private def maxOverCleanerThreads(f: CleanerThread => Double): Int =
cleaners.foldLeft(0.0d)((max: Double, thread: CleanerThread) => math.max(max, f(thread))).toInt


/* a metric to track the maximum utilization of any thread's buffer in the last cleaning */
metricsGroup.newGauge("max-buffer-utilization-percent",
metricsGroup.newGauge(MaxBufferUtilizationPercentMetricName,
() => maxOverCleanerThreads(_.lastStats.bufferUtilization) * 100)

/* a metric to track the recopy rate of each thread's last cleaning */
metricsGroup.newGauge("cleaner-recopy-percent", () => {
metricsGroup.newGauge(CleanerRecopyPercentMetricName, () => {
val stats = cleaners.map(_.lastStats)
val recopyRate = stats.iterator.map(_.bytesWritten).sum.toDouble / math.max(stats.iterator.map(_.bytesRead).sum, 1)
(100 * recopyRate).toInt
})

/* a metric to track the maximum cleaning time for the last cleaning from each thread */
metricsGroup.newGauge("max-clean-time-secs",
() => maxOverCleanerThreads(_.lastStats.elapsedSecs))

metricsGroup.newGauge(MaxCleanTimeMetricName, () => maxOverCleanerThreads(_.lastStats.elapsedSecs))

// a metric to track delay between the time when a log is required to be compacted
// as determined by max compaction lag and the time of last cleaner run.
metricsGroup.newGauge("max-compaction-delay-secs",
metricsGroup.newGauge(MaxCompactionDelayMetricsName,
() => maxOverCleanerThreads(_.lastPreCleanStats.maxCompactionDelayMs.toDouble) / 1000)

metricsGroup.newGauge("DeadThreadCount", () => deadThreadCount)
metricsGroup.newGauge(DeadThreadCountMetricName, () => deadThreadCount)

private[log] def deadThreadCount: Int = cleaners.count(_.isThreadFailed)

Expand All @@ -167,8 +165,16 @@ class LogCleaner(initialConfig: CleanerConfig,
*/
def shutdown(): Unit = {
info("Shutting down the log cleaner.")
cleaners.foreach(_.shutdown())
cleaners.clear()
try {
cleaners.foreach(_.shutdown())
cleaners.clear()
} finally {
removeMetrics()
}
}

def removeMetrics(): Unit = {
LogCleaner.MetricNames.foreach(metricsGroup.removeMetric)
}

override def reconfigurableConfigs: Set[String] = {
Expand All @@ -189,14 +195,14 @@ class LogCleaner(initialConfig: CleanerConfig,

/**
* Reconfigure log clean config. The will:
* 1. update desiredRatePerSec in Throttler with logCleanerIoMaxBytesPerSecond, if necessary
* 1. update desiredRatePerSec in Throttler with logCleanerIoMaxBytesPerSecond, if necessary
* 2. stop current log cleaners and create new ones.
* That ensures that if any of the cleaners had failed, new cleaners are created to match the new config.
*/
override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
config = LogCleaner.cleanerConfig(newConfig)

val maxIoBytesPerSecond = config.maxIoBytesPerSecond;
val maxIoBytesPerSecond = config.maxIoBytesPerSecond
if (maxIoBytesPerSecond != oldConfig.logCleanerIoMaxBytesPerSecond) {
info(s"Updating logCleanerIoMaxBytesPerSecond: $maxIoBytesPerSecond")
throttler.updateDesiredRatePerSec(maxIoBytesPerSecond)
Expand Down Expand Up @@ -466,6 +472,19 @@ object LogCleaner {
config.logCleanerEnable)

}

private val MaxBufferUtilizationPercentMetricName = "max-buffer-utilization-percent"
private val CleanerRecopyPercentMetricName = "cleaner-recopy-percent"
private val MaxCleanTimeMetricName = "max-clean-time-secs"
private val MaxCompactionDelayMetricsName = "max-compaction-delay-secs"
private val DeadThreadCountMetricName = "DeadThreadCount"
// package private for testing
private[log] val MetricNames = Set(
MaxBufferUtilizationPercentMetricName,
CleanerRecopyPercentMetricName,
MaxCleanTimeMetricName,
MaxCompactionDelayMetricsName,
DeadThreadCountMetricName)
}

/**
Expand Down
30 changes: 30 additions & 0 deletions core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@ import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.errors.CorruptRecordException
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, CleanerConfig, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogStartOffsetIncrementReason, OffsetMap, ProducerStateManager, ProducerStateManagerConfig}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, Test}
import org.mockito.ArgumentMatchers.{any, anyString}
import org.mockito.Mockito.{mockConstruction, times, verify, verifyNoMoreInteractions}

import java.io.{File, RandomAccessFile}
import java.nio._
Expand Down Expand Up @@ -62,6 +65,33 @@ class LogCleanerTest {
Utils.delete(tmpdir)
}

@Test
def testRemoveMetricsOnClose(): Unit = {
val mockMetricsGroupCtor = mockConstruction(classOf[KafkaMetricsGroup])
try {
val logCleaner = new LogCleaner(new CleanerConfig(true),
logDirs = Array(TestUtils.tempDir()),
logs = new Pool[TopicPartition, UnifiedLog](),
logDirFailureChannel = new LogDirFailureChannel(1),
time = time)

// shutdown logCleaner so that metrics are removed
logCleaner.shutdown()

val mockMetricsGroup = mockMetricsGroupCtor.constructed.get(0)
val numMetricsRegistered = LogCleaner.MetricNames.size
verify(mockMetricsGroup, times(numMetricsRegistered)).newGauge(anyString(), any())

// verify that each metric is removed
LogCleaner.MetricNames.foreach(verify(mockMetricsGroup).removeMetric(_))

// assert that we have verified all invocations on
verifyNoMoreInteractions(mockMetricsGroup)
} finally {
mockMetricsGroupCtor.close()
}
}

/**
* Test simple log cleaning
*/
Expand Down