Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 47 additions & 24 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import kafka.log.remote.RemoteLogManager
import kafka.log.{LogManager, UnifiedLog}
import kafka.server.HostedPartition.Online
import kafka.server.QuotaFactory.QuotaManagers
import kafka.server.ReplicaManager.{AtMinIsrPartitionCountMetricName, FailedIsrUpdatesPerSecMetricName, IsrExpandsPerSecMetricName, IsrShrinksPerSecMetricName, LeaderCountMetricName, OfflineReplicaCountMetricName, PartitionCountMetricName, PartitionsWithLateTransactionsCountMetricName, ProducerIdCountMetricName, ReassigningPartitionsMetricName, UnderMinIsrPartitionCountMetricName, UnderReplicatedPartitionsMetricName}
import kafka.server.ReplicaManager.createLogReadResult
import kafka.server.checkpoints.{LazyOffsetCheckpoints, OffsetCheckpointFile, OffsetCheckpoints}
import kafka.server.metadata.ZkMetadataCache
Expand Down Expand Up @@ -177,6 +178,39 @@ object HostedPartition {
object ReplicaManager {
val HighWatermarkFilename = "replication-offset-checkpoint"

private val LeaderCountMetricName = "LeaderCount"
private val PartitionCountMetricName = "PartitionCount"
private val OfflineReplicaCountMetricName = "OfflineReplicaCount"
private val UnderReplicatedPartitionsMetricName = "UnderReplicatedPartitions"
private val UnderMinIsrPartitionCountMetricName = "UnderMinIsrPartitionCount"
private val AtMinIsrPartitionCountMetricName = "AtMinIsrPartitionCount"
private val ReassigningPartitionsMetricName = "ReassigningPartitions"
private val PartitionsWithLateTransactionsCountMetricName = "PartitionsWithLateTransactionsCount"
private val ProducerIdCountMetricName = "ProducerIdCount"
private val IsrExpandsPerSecMetricName = "IsrExpandsPerSec"
private val IsrShrinksPerSecMetricName = "IsrShrinksPerSec"
private val FailedIsrUpdatesPerSecMetricName = "FailedIsrUpdatesPerSec"

private[server] val GaugeMetricNames = Set(
LeaderCountMetricName,
PartitionCountMetricName,
OfflineReplicaCountMetricName,
UnderReplicatedPartitionsMetricName,
UnderMinIsrPartitionCountMetricName,
AtMinIsrPartitionCountMetricName,
ReassigningPartitionsMetricName,
PartitionsWithLateTransactionsCountMetricName,
ProducerIdCountMetricName
)

private[server] val MeterMetricNames = Set(
IsrExpandsPerSecMetricName,
IsrShrinksPerSecMetricName,
FailedIsrUpdatesPerSecMetricName
)

private[server] val MetricNames = GaugeMetricNames.union(MeterMetricNames)

def createLogReadResult(highWatermark: Long,
leaderLogStartOffset: Long,
leaderLogEndOffset: Long,
Expand Down Expand Up @@ -282,16 +316,16 @@ class ReplicaManager(val config: KafkaConfig,
// Visible for testing
private[server] val replicaSelectorOpt: Option[ReplicaSelector] = createReplicaSelector()

metricsGroup.newGauge("LeaderCount", () => leaderPartitionsIterator.size)
metricsGroup.newGauge(LeaderCountMetricName, () => leaderPartitionsIterator.size)
// Visible for testing
private[kafka] val partitionCount = metricsGroup.newGauge("PartitionCount", () => allPartitions.size)
metricsGroup.newGauge("OfflineReplicaCount", () => offlinePartitionCount)
metricsGroup.newGauge("UnderReplicatedPartitions", () => underReplicatedPartitionCount)
metricsGroup.newGauge("UnderMinIsrPartitionCount", () => leaderPartitionsIterator.count(_.isUnderMinIsr))
metricsGroup.newGauge("AtMinIsrPartitionCount", () => leaderPartitionsIterator.count(_.isAtMinIsr))
metricsGroup.newGauge("ReassigningPartitions", () => reassigningPartitionsCount)
metricsGroup.newGauge("PartitionsWithLateTransactionsCount", () => lateTransactionsCount)
metricsGroup.newGauge("ProducerIdCount", () => producerIdCount)
private[kafka] val partitionCount = metricsGroup.newGauge(PartitionCountMetricName, () => allPartitions.size)
metricsGroup.newGauge(OfflineReplicaCountMetricName, () => offlinePartitionCount)
metricsGroup.newGauge(UnderReplicatedPartitionsMetricName, () => underReplicatedPartitionCount)
metricsGroup.newGauge(UnderMinIsrPartitionCountMetricName, () => leaderPartitionsIterator.count(_.isUnderMinIsr))
metricsGroup.newGauge(AtMinIsrPartitionCountMetricName, () => leaderPartitionsIterator.count(_.isAtMinIsr))
metricsGroup.newGauge(ReassigningPartitionsMetricName, () => reassigningPartitionsCount)
metricsGroup.newGauge(PartitionsWithLateTransactionsCountMetricName, () => lateTransactionsCount)
metricsGroup.newGauge(ProducerIdCountMetricName, () => producerIdCount)

def reassigningPartitionsCount: Int = leaderPartitionsIterator.count(_.isReassigning)

Expand All @@ -302,9 +336,9 @@ class ReplicaManager(val config: KafkaConfig,

def producerIdCount: Int = onlinePartitionsIterator.map(_.producerIdCount).sum

val isrExpandRate: Meter = metricsGroup.newMeter("IsrExpandsPerSec", "expands", TimeUnit.SECONDS)
val isrShrinkRate: Meter = metricsGroup.newMeter("IsrShrinksPerSec", "shrinks", TimeUnit.SECONDS)
val failedIsrUpdatesRate: Meter = metricsGroup.newMeter("FailedIsrUpdatesPerSec", "failedUpdates", TimeUnit.SECONDS)
val isrExpandRate: Meter = metricsGroup.newMeter(IsrExpandsPerSecMetricName, "expands", TimeUnit.SECONDS)
val isrShrinkRate: Meter = metricsGroup.newMeter(IsrShrinksPerSecMetricName, "shrinks", TimeUnit.SECONDS)
val failedIsrUpdatesRate: Meter = metricsGroup.newMeter(FailedIsrUpdatesPerSecMetricName, "failedUpdates", TimeUnit.SECONDS)

def underReplicatedPartitionCount: Int = leaderPartitionsIterator.count(_.isUnderReplicated)

Expand Down Expand Up @@ -2160,18 +2194,7 @@ class ReplicaManager(val config: KafkaConfig,
}

def removeMetrics(): Unit = {
metricsGroup.removeMetric("LeaderCount")
metricsGroup.removeMetric("PartitionCount")
metricsGroup.removeMetric("OfflineReplicaCount")
metricsGroup.removeMetric("UnderReplicatedPartitions")
metricsGroup.removeMetric("UnderMinIsrPartitionCount")
metricsGroup.removeMetric("AtMinIsrPartitionCount")
metricsGroup.removeMetric("ReassigningPartitions")
metricsGroup.removeMetric("PartitionsWithLateTransactionsCount")
metricsGroup.removeMetric("ProducerIdCount")
metricsGroup.removeMetric("IsrExpandsPerSec")
metricsGroup.removeMetric("IsrShrinksPerSec")
metricsGroup.removeMetric("FailedIsrUpdatesPerSec")
ReplicaManager.MetricNames.foreach(metricsGroup.removeMetric)
}

def beginControlledShutdown(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,9 +372,9 @@ class ReplicaManagerTest {
// Use the second instance of metrics group that is constructed. The first instance is constructed by
// ReplicaManager constructor > BrokerTopicStats > BrokerTopicMetrics.
val mockMetricsGroup = mockMetricsGroupCtor.constructed.get(1)
verify(mockMetricsGroup, times(9)).newGauge(anyString(), any())
verify(mockMetricsGroup, times(3)).newMeter(anyString(), anyString(), any(classOf[TimeUnit]))
verify(mockMetricsGroup, times(12)).removeMetric(anyString())
ReplicaManager.GaugeMetricNames.foreach(metricName => verify(mockMetricsGroup).newGauge(ArgumentMatchers.eq(metricName), any()))
ReplicaManager.MeterMetricNames.foreach(metricName => verify(mockMetricsGroup).newMeter(ArgumentMatchers.eq(metricName), anyString(), any(classOf[TimeUnit])))
ReplicaManager.MetricNames.foreach(verify(mockMetricsGroup).removeMetric(_))

// assert that we have verified all invocations on
verifyNoMoreInteractions(mockMetricsGroup)
Expand Down