From 82ada93ae5343f81a418b6d5da76d5b4fedd80cc Mon Sep 17 00:00:00 2001 From: Bob Barrett Date: Tue, 2 Jun 2020 09:52:20 -0700 Subject: [PATCH 1/2] KAFKA-9788: Use distinct names for transaction and group load time sensors Sensor objects are stored in the Kafka metrics registry and keyed by name. If a new sensor is created with the same name as an existing one, the existing one is returned rather than a new object being created. The partition load time sensors for the transaction and group coordinators used the same name, so data recorded to either was stored in the same object. This meant that the metrics values for both metrics were identical and consisted of the combined data. This patch changes the names to be distinct so that the data will be stored in separate Sensor objects. --- .../kafka/coordinator/group/GroupMetadataManager.scala | 4 ++-- .../transaction/TransactionStateManager.scala | 9 ++++++--- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala index 48806e697b9ec..d914343fa1da8 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala @@ -49,9 +49,9 @@ import org.apache.kafka.common.requests.{OffsetCommitRequest, OffsetFetchRespons import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.common.{KafkaException, TopicPartition} -import scala.jdk.CollectionConverters._ import scala.collection._ import scala.collection.mutable.ArrayBuffer +import scala.jdk.CollectionConverters._ class GroupMetadataManager(brokerId: Int, interBrokerProtocolVersion: ApiVersion, @@ -89,7 +89,7 @@ class GroupMetadataManager(brokerId: Int, private val openGroupsForProducer = mutable.HashMap[Long, mutable.Set[String]]() /* setup metrics*/ - val partitionLoadSensor = metrics.sensor("PartitionLoadTime") + private val partitionLoadSensor = metrics.sensor("GroupPartitionLoadTime") partitionLoadSensor.add(metrics.metricName("partition-load-time-max", "group-coordinator-metrics", diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala index a96857c558607..8eb65df1d3393 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala @@ -48,6 +48,9 @@ object TransactionStateManager { val DefaultTransactionalIdExpirationMs: Int = TimeUnit.DAYS.toMillis(7).toInt val DefaultAbortTimedOutTransactionsIntervalMs: Int = TimeUnit.SECONDS.toMillis(10).toInt val DefaultRemoveExpiredTransactionalIdsIntervalMs: Int = TimeUnit.HOURS.toMillis(1).toInt + + val MetricsGroup: String = "transaction-coordinator-metrics" + val LoadTimeSensor: String = "TransactionsPartitionLoadTime" } /** @@ -95,13 +98,13 @@ class TransactionStateManager(brokerId: Int, private val transactionTopicPartitionCount = getTransactionTopicPartitionCount /** setup metrics*/ - private val partitionLoadSensor = metrics.sensor("PartitionLoadTime") + private val partitionLoadSensor = metrics.sensor(TransactionStateManager.LoadTimeSensor) partitionLoadSensor.add(metrics.metricName("partition-load-time-max", - "transaction-coordinator-metrics", + TransactionStateManager.MetricsGroup, "The max time it took to load the partitions in the last 30sec"), new Max()) partitionLoadSensor.add(metrics.metricName("partition-load-time-avg", - "transaction-coordinator-metrics", + TransactionStateManager.MetricsGroup, "The avg time it took to load the partitions in the last 30sec"), new Avg()) // visible for testing only From 68127aebf45308e4942eb456037273520c14bab7 Mon Sep 17 00:00:00 2001 From: Bob Barrett Date: Tue, 2 Jun 2020 10:10:53 -0700 Subject: [PATCH 2/2] Add group metrics constants --- .../kafka/coordinator/group/GroupMetadataManager.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala index d914343fa1da8..9a548250d7b83 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala @@ -89,13 +89,13 @@ class GroupMetadataManager(brokerId: Int, private val openGroupsForProducer = mutable.HashMap[Long, mutable.Set[String]]() /* setup metrics*/ - private val partitionLoadSensor = metrics.sensor("GroupPartitionLoadTime") + private val partitionLoadSensor = metrics.sensor(GroupMetadataManager.LoadTimeSensor) partitionLoadSensor.add(metrics.metricName("partition-load-time-max", - "group-coordinator-metrics", + GroupMetadataManager.MetricsGroup, "The max time it took to load the partitions in the last 30sec"), new Max()) partitionLoadSensor.add(metrics.metricName("partition-load-time-avg", - "group-coordinator-metrics", + GroupMetadataManager.MetricsGroup, "The avg time it took to load the partitions in the last 30sec"), new Avg()) val offsetCommitsSensor = metrics.sensor("OffsetCommits") @@ -992,6 +992,9 @@ class GroupMetadataManager(brokerId: Int, * -> value version 0: [protocol_type, generation, protocol, leader, members] */ object GroupMetadataManager { + // Metrics names + val MetricsGroup: String = "group-coordinator-metrics" + val LoadTimeSensor: String = "GroupPartitionLoadTime" private val CURRENT_OFFSET_KEY_SCHEMA_VERSION = 1.toShort private val CURRENT_GROUP_KEY_SCHEMA_VERSION = 2.toShort