diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala index 48806e697b9ec..9a548250d7b83 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala @@ -49,9 +49,9 @@ import org.apache.kafka.common.requests.{OffsetCommitRequest, OffsetFetchRespons import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.common.{KafkaException, TopicPartition} -import scala.jdk.CollectionConverters._ import scala.collection._ import scala.collection.mutable.ArrayBuffer +import scala.jdk.CollectionConverters._ class GroupMetadataManager(brokerId: Int, interBrokerProtocolVersion: ApiVersion, @@ -89,13 +89,13 @@ class GroupMetadataManager(brokerId: Int, private val openGroupsForProducer = mutable.HashMap[Long, mutable.Set[String]]() /* setup metrics*/ - val partitionLoadSensor = metrics.sensor("PartitionLoadTime") + private val partitionLoadSensor = metrics.sensor(GroupMetadataManager.LoadTimeSensor) partitionLoadSensor.add(metrics.metricName("partition-load-time-max", - "group-coordinator-metrics", + GroupMetadataManager.MetricsGroup, "The max time it took to load the partitions in the last 30sec"), new Max()) partitionLoadSensor.add(metrics.metricName("partition-load-time-avg", - "group-coordinator-metrics", + GroupMetadataManager.MetricsGroup, "The avg time it took to load the partitions in the last 30sec"), new Avg()) val offsetCommitsSensor = metrics.sensor("OffsetCommits") @@ -992,6 +992,9 @@ class GroupMetadataManager(brokerId: Int, * -> value version 0: [protocol_type, generation, protocol, leader, members] */ object GroupMetadataManager { + // Metrics names + val MetricsGroup: String = "group-coordinator-metrics" + val LoadTimeSensor: String = "GroupPartitionLoadTime" private val CURRENT_OFFSET_KEY_SCHEMA_VERSION = 1.toShort private val CURRENT_GROUP_KEY_SCHEMA_VERSION = 2.toShort diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala index a96857c558607..8eb65df1d3393 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala @@ -48,6 +48,9 @@ object TransactionStateManager { val DefaultTransactionalIdExpirationMs: Int = TimeUnit.DAYS.toMillis(7).toInt val DefaultAbortTimedOutTransactionsIntervalMs: Int = TimeUnit.SECONDS.toMillis(10).toInt val DefaultRemoveExpiredTransactionalIdsIntervalMs: Int = TimeUnit.HOURS.toMillis(1).toInt + + val MetricsGroup: String = "transaction-coordinator-metrics" + val LoadTimeSensor: String = "TransactionsPartitionLoadTime" } /** @@ -95,13 +98,13 @@ class TransactionStateManager(brokerId: Int, private val transactionTopicPartitionCount = getTransactionTopicPartitionCount /** setup metrics*/ - private val partitionLoadSensor = metrics.sensor("PartitionLoadTime") + private val partitionLoadSensor = metrics.sensor(TransactionStateManager.LoadTimeSensor) partitionLoadSensor.add(metrics.metricName("partition-load-time-max", - "transaction-coordinator-metrics", + TransactionStateManager.MetricsGroup, "The max time it took to load the partitions in the last 30sec"), new Max()) partitionLoadSensor.add(metrics.metricName("partition-load-time-avg", - "transaction-coordinator-metrics", + TransactionStateManager.MetricsGroup, "The avg time it took to load the partitions in the last 30sec"), new Avg()) // visible for testing only