diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index c559593f7756d..912b3072988db 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -531,12 +531,14 @@ private KafkaConsumer(ConsumerConfig config, throw new ConfigException(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG + " should be greater than " + ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG + " and " + ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG); this.time = new SystemTime(); - MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG)) - .timeWindow(config.getLong(ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), - TimeUnit.MILLISECONDS); clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG); if (clientId.length() <= 0) clientId = "consumer-" + CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement(); + Map metricsTags = new LinkedHashMap(); + metricsTags.put("client-id", clientId); + MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG)) + .timeWindow(config.getLong(ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS) + .tags(metricsTags); List reporters = config.getConfiguredInstances(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class); reporters.add(new JmxReporter(JMX_PREFIX)); @@ -546,11 +548,9 @@ private KafkaConsumer(ConsumerConfig config, List addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); this.metadata.update(Cluster.bootstrap(addresses), 0); String metricGrpPrefix = "consumer"; - Map metricsTags = new LinkedHashMap(); - metricsTags.put("client-id", clientId); ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values()); NetworkClient netClient = new NetworkClient( - new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, metricsTags, channelBuilder), + new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, channelBuilder), this.metadata, clientId, 100, // a fixed large enough value will suffice @@ -573,7 +573,6 @@ private KafkaConsumer(ConsumerConfig config, this.subscriptions, metrics, metricGrpPrefix, - metricsTags, this.time, retryBackoffMs, new ConsumerCoordinator.DefaultOffsetCommitCallback(), @@ -606,7 +605,6 @@ private KafkaConsumer(ConsumerConfig config, this.subscriptions, metrics, metricGrpPrefix, - metricsTags, this.time, this.retryBackoffMs); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 33886edad9813..c6492bc66d2a6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -14,7 +14,6 @@ import org.apache.kafka.clients.ClientResponse; import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.MetricName; import org.apache.kafka.common.Node; import org.apache.kafka.common.errors.DisconnectException; import org.apache.kafka.common.errors.GroupAuthorizationException; @@ -107,7 +106,6 @@ public AbstractCoordinator(ConsumerNetworkClient client, int heartbeatIntervalMs, Metrics metrics, String metricGrpPrefix, - Map metricTags, Time time, long retryBackoffMs) { this.client = client; @@ -119,7 +117,7 @@ public AbstractCoordinator(ConsumerNetworkClient client, this.sessionTimeoutMs = sessionTimeoutMs; this.heartbeat = new Heartbeat(this.sessionTimeoutMs, heartbeatIntervalMs, time.milliseconds()); this.heartbeatTask = new HeartbeatTask(); - this.sensors = new GroupCoordinatorMetrics(metrics, metricGrpPrefix, metricTags); + this.sensors = new GroupCoordinatorMetrics(metrics, metricGrpPrefix); this.retryBackoffMs = retryBackoffMs; } @@ -679,47 +677,39 @@ private class GroupCoordinatorMetrics { public final Sensor joinLatency; public final Sensor syncLatency; - public GroupCoordinatorMetrics(Metrics metrics, String metricGrpPrefix, Map tags) { + public GroupCoordinatorMetrics(Metrics metrics, String metricGrpPrefix) { this.metrics = metrics; this.metricGrpName = metricGrpPrefix + "-coordinator-metrics"; this.heartbeatLatency = metrics.sensor("heartbeat-latency"); - this.heartbeatLatency.add(new MetricName("heartbeat-response-time-max", + this.heartbeatLatency.add(metrics.metricName("heartbeat-response-time-max", this.metricGrpName, - "The max time taken to receive a response to a heartbeat request", - tags), new Max()); - this.heartbeatLatency.add(new MetricName("heartbeat-rate", + "The max time taken to receive a response to a heartbeat request"), new Max()); + this.heartbeatLatency.add(metrics.metricName("heartbeat-rate", this.metricGrpName, - "The average number of heartbeats per second", - tags), new Rate(new Count())); + "The average number of heartbeats per second"), new Rate(new Count())); this.joinLatency = metrics.sensor("join-latency"); - this.joinLatency.add(new MetricName("join-time-avg", + this.joinLatency.add(metrics.metricName("join-time-avg", this.metricGrpName, - "The average time taken for a group rejoin", - tags), new Avg()); - this.joinLatency.add(new MetricName("join-time-max", + "The average time taken for a group rejoin"), new Avg()); + this.joinLatency.add(metrics.metricName("join-time-max", this.metricGrpName, - "The max time taken for a group rejoin", - tags), new Avg()); - this.joinLatency.add(new MetricName("join-rate", + "The max time taken for a group rejoin"), new Avg()); + this.joinLatency.add(metrics.metricName("join-rate", this.metricGrpName, - "The number of group joins per second", - tags), new Rate(new Count())); + "The number of group joins per second"), new Rate(new Count())); this.syncLatency = metrics.sensor("sync-latency"); - this.syncLatency.add(new MetricName("sync-time-avg", + this.syncLatency.add(metrics.metricName("sync-time-avg", this.metricGrpName, - "The average time taken for a group sync", - tags), new Avg()); - this.syncLatency.add(new MetricName("sync-time-max", + "The average time taken for a group sync"), new Avg()); + this.syncLatency.add(metrics.metricName("sync-time-max", this.metricGrpName, - "The max time taken for a group sync", - tags), new Avg()); - this.syncLatency.add(new MetricName("sync-rate", + "The max time taken for a group sync"), new Avg()); + this.syncLatency.add(metrics.metricName("sync-rate", this.metricGrpName, - "The number of group syncs per second", - tags), new Rate(new Count())); + "The number of group syncs per second"), new Rate(new Count())); Measurable lastHeartbeat = new Measurable() { @@ -727,10 +717,9 @@ public double measure(MetricConfig config, long now) { return TimeUnit.SECONDS.convert(now - heartbeat.lastHeartbeatSend(), TimeUnit.MILLISECONDS); } }; - metrics.addMetric(new MetricName("last-heartbeat-seconds-ago", + metrics.addMetric(metrics.metricName("last-heartbeat-seconds-ago", this.metricGrpName, - "The number of seconds since the last controller heartbeat", - tags), + "The number of seconds since the last controller heartbeat"), lastHeartbeat); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 4ac05a3a68f4d..41d2a279c3bd2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -22,7 +22,6 @@ import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.MetricName; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.GroupAuthorizationException; import org.apache.kafka.common.errors.TopicAuthorizationException; @@ -82,7 +81,6 @@ public ConsumerCoordinator(ConsumerNetworkClient client, SubscriptionState subscriptions, Metrics metrics, String metricGrpPrefix, - Map metricTags, Time time, long retryBackoffMs, OffsetCommitCallback defaultOffsetCommitCallback, @@ -94,7 +92,6 @@ public ConsumerCoordinator(ConsumerNetworkClient client, heartbeatIntervalMs, metrics, metricGrpPrefix, - metricTags, time, retryBackoffMs); this.metadata = metadata; @@ -109,7 +106,7 @@ public ConsumerCoordinator(ConsumerNetworkClient client, addMetadataListener(); this.autoCommitTask = autoCommitEnabled ? new AutoCommitTask(autoCommitIntervalMs) : null; - this.sensors = new ConsumerCoordinatorMetrics(metrics, metricGrpPrefix, metricTags); + this.sensors = new ConsumerCoordinatorMetrics(metrics, metricGrpPrefix); } @Override @@ -639,23 +636,20 @@ private class ConsumerCoordinatorMetrics { public final Sensor commitLatency; - public ConsumerCoordinatorMetrics(Metrics metrics, String metricGrpPrefix, Map tags) { + public ConsumerCoordinatorMetrics(Metrics metrics, String metricGrpPrefix) { this.metrics = metrics; this.metricGrpName = metricGrpPrefix + "-coordinator-metrics"; this.commitLatency = metrics.sensor("commit-latency"); - this.commitLatency.add(new MetricName("commit-latency-avg", + this.commitLatency.add(metrics.metricName("commit-latency-avg", this.metricGrpName, - "The average time taken for a commit request", - tags), new Avg()); - this.commitLatency.add(new MetricName("commit-latency-max", + "The average time taken for a commit request"), new Avg()); + this.commitLatency.add(metrics.metricName("commit-latency-max", this.metricGrpName, - "The max time taken for a commit request", - tags), new Max()); - this.commitLatency.add(new MetricName("commit-rate", + "The max time taken for a commit request"), new Max()); + this.commitLatency.add(metrics.metricName("commit-rate", this.metricGrpName, - "The number of commit calls per second", - tags), new Rate(new Count())); + "The number of commit calls per second"), new Rate(new Count())); Measurable numParts = new Measurable() { @@ -663,11 +657,9 @@ public double measure(MetricConfig config, long now) { return subscriptions.assignedPartitions().size(); } }; - metrics.addMetric(new MetricName("assigned-partitions", + metrics.addMetric(metrics.metricName("assigned-partitions", this.metricGrpName, - "The number of partitions currently assigned to this consumer", - tags), - numParts); + "The number of partitions currently assigned to this consumer"), numParts); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 57088695d51e2..e152088bf3e19 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -21,7 +21,6 @@ import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.MetricName; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; @@ -99,7 +98,6 @@ public Fetcher(ConsumerNetworkClient client, SubscriptionState subscriptions, Metrics metrics, String metricGrpPrefix, - Map metricTags, Time time, long retryBackoffMs) { @@ -120,7 +118,7 @@ public Fetcher(ConsumerNetworkClient client, this.unauthorizedTopics = new HashSet<>(); this.recordTooLargePartitions = new HashMap<>(); - this.sensors = new FetchManagerMetrics(metrics, metricGrpPrefix, metricTags); + this.sensors = new FetchManagerMetrics(metrics, metricGrpPrefix); this.retryBackoffMs = retryBackoffMs; } @@ -656,64 +654,53 @@ private class FetchManagerMetrics { public final Sensor fetchThrottleTimeSensor; - public FetchManagerMetrics(Metrics metrics, String metricGrpPrefix, Map tags) { + public FetchManagerMetrics(Metrics metrics, String metricGrpPrefix) { this.metrics = metrics; this.metricGrpName = metricGrpPrefix + "-fetch-manager-metrics"; this.bytesFetched = metrics.sensor("bytes-fetched"); - this.bytesFetched.add(new MetricName("fetch-size-avg", + this.bytesFetched.add(metrics.metricName("fetch-size-avg", this.metricGrpName, - "The average number of bytes fetched per request", - tags), new Avg()); - this.bytesFetched.add(new MetricName("fetch-size-max", + "The average number of bytes fetched per request"), new Avg()); + this.bytesFetched.add(metrics.metricName("fetch-size-max", this.metricGrpName, - "The maximum number of bytes fetched per request", - tags), new Max()); - this.bytesFetched.add(new MetricName("bytes-consumed-rate", + "The maximum number of bytes fetched per request"), new Max()); + this.bytesFetched.add(metrics.metricName("bytes-consumed-rate", this.metricGrpName, - "The average number of bytes consumed per second", - tags), new Rate()); + "The average number of bytes consumed per second"), new Rate()); this.recordsFetched = metrics.sensor("records-fetched"); - this.recordsFetched.add(new MetricName("records-per-request-avg", + this.recordsFetched.add(metrics.metricName("records-per-request-avg", this.metricGrpName, - "The average number of records in each request", - tags), new Avg()); - this.recordsFetched.add(new MetricName("records-consumed-rate", + "The average number of records in each request"), new Avg()); + this.recordsFetched.add(metrics.metricName("records-consumed-rate", this.metricGrpName, - "The average number of records consumed per second", - tags), new Rate()); + "The average number of records consumed per second"), new Rate()); this.fetchLatency = metrics.sensor("fetch-latency"); - this.fetchLatency.add(new MetricName("fetch-latency-avg", + this.fetchLatency.add(metrics.metricName("fetch-latency-avg", this.metricGrpName, - "The average time taken for a fetch request.", - tags), new Avg()); - this.fetchLatency.add(new MetricName("fetch-latency-max", + "The average time taken for a fetch request."), new Avg()); + this.fetchLatency.add(metrics.metricName("fetch-latency-max", this.metricGrpName, - "The max time taken for any fetch request.", - tags), new Max()); - this.fetchLatency.add(new MetricName("fetch-rate", + "The max time taken for any fetch request."), new Max()); + this.fetchLatency.add(metrics.metricName("fetch-rate", this.metricGrpName, - "The number of fetch requests per second.", - tags), new Rate(new Count())); + "The number of fetch requests per second."), new Rate(new Count())); this.recordsFetchLag = metrics.sensor("records-lag"); - this.recordsFetchLag.add(new MetricName("records-lag-max", + this.recordsFetchLag.add(metrics.metricName("records-lag-max", this.metricGrpName, - "The maximum lag in terms of number of records for any partition in this window", - tags), new Max()); + "The maximum lag in terms of number of records for any partition in this window"), new Max()); this.fetchThrottleTimeSensor = metrics.sensor("fetch-throttle-time"); - this.fetchThrottleTimeSensor.add(new MetricName("fetch-throttle-time-avg", + this.fetchThrottleTimeSensor.add(metrics.metricName("fetch-throttle-time-avg", this.metricGrpName, - "The average throttle time in ms", - tags), new Avg()); + "The average throttle time in ms"), new Avg()); - this.fetchThrottleTimeSensor.add(new MetricName("fetch-throttle-time-max", + this.fetchThrottleTimeSensor.add(metrics.metricName("fetch-throttle-time-max", this.metricGrpName, - "The maximum throttle time in ms", - tags), new Max()); + "The maximum throttle time in ms"), new Max()); } public void recordTopicFetchMetrics(String topic, int bytes, int records) { diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 38fb6a654b20d..49560b5cacb3d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -203,12 +203,14 @@ private KafkaProducer(ProducerConfig config, Serializer keySerializer, Serial this.producerConfig = config; this.time = new SystemTime(); - MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG)) - .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), - TimeUnit.MILLISECONDS); clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG); if (clientId.length() <= 0) clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement(); + Map metricTags = new LinkedHashMap(); + metricTags.put("client-id", clientId); + MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG)) + .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS) + .tags(metricTags); List reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class); reporters.add(new JmxReporter(JMX_PREFIX)); @@ -256,21 +258,18 @@ private KafkaProducer(ProducerConfig config, Serializer keySerializer, Serial this.requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG); } - Map metricTags = new LinkedHashMap(); - metricTags.put("client-id", clientId); this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), this.totalMemorySize, this.compressionType, config.getLong(ProducerConfig.LINGER_MS_CONFIG), retryBackoffMs, metrics, - time, - metricTags); + time); List addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds()); ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values()); NetworkClient client = new NetworkClient( - new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", metricTags, channelBuilder), + new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", channelBuilder), this.metadata, clientId, config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION), diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java index 2a45075189466..f881e62c333ce 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java @@ -19,7 +19,6 @@ import java.nio.ByteBuffer; import java.util.ArrayDeque; import java.util.Deque; -import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; @@ -62,9 +61,8 @@ public final class BufferPool { * @param metrics instance of Metrics * @param time time instance * @param metricGrpName logical group name for metrics - * @param metricTags additional key/val attributes for metrics */ - public BufferPool(long memory, int poolableSize, Metrics metrics, Time time , String metricGrpName , Map metricTags) { + public BufferPool(long memory, int poolableSize, Metrics metrics, Time time, String metricGrpName) { this.poolableSize = poolableSize; this.lock = new ReentrantLock(); this.free = new ArrayDeque(); @@ -74,10 +72,9 @@ public BufferPool(long memory, int poolableSize, Metrics metrics, Time time , St this.metrics = metrics; this.time = time; this.waitTime = this.metrics.sensor("bufferpool-wait-time"); - MetricName metricName = new MetricName("bufferpool-wait-ratio", - metricGrpName, - "The fraction of time an appender waits for space allocation.", - metricTags); + MetricName metricName = metrics.metricName("bufferpool-wait-ratio", + metricGrpName, + "The fraction of time an appender waits for space allocation."); this.waitTime.add(metricName, new Rate(TimeUnit.NANOSECONDS)); } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index d4a8a230befac..4b394f999f5c5 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -85,7 +85,6 @@ public final class RecordAccumulator { * exhausting all retries in a short period of time. * @param metrics The metrics * @param time The time instance to use - * @param metricTags additional key/value attributes of the metric */ public RecordAccumulator(int batchSize, long totalSize, @@ -93,8 +92,7 @@ public RecordAccumulator(int batchSize, long lingerMs, long retryBackoffMs, Metrics metrics, - Time time, - Map metricTags) { + Time time) { this.drainIndex = 0; this.closed = false; this.flushesInProgress = new AtomicInteger(0); @@ -105,14 +103,14 @@ public RecordAccumulator(int batchSize, this.retryBackoffMs = retryBackoffMs; this.batches = new CopyOnWriteMap>(); String metricGrpName = "producer-metrics"; - this.free = new BufferPool(totalSize, batchSize, metrics, time , metricGrpName , metricTags); + this.free = new BufferPool(totalSize, batchSize, metrics, time, metricGrpName); this.incomplete = new IncompleteRecordBatches(); this.time = time; - registerMetrics(metrics, metricGrpName, metricTags); + registerMetrics(metrics, metricGrpName); } - private void registerMetrics(Metrics metrics, String metricGrpName, Map metricTags) { - MetricName metricName = new MetricName("waiting-threads", metricGrpName, "The number of user threads blocked waiting for buffer memory to enqueue their records", metricTags); + private void registerMetrics(Metrics metrics, String metricGrpName) { + MetricName metricName = metrics.metricName("waiting-threads", metricGrpName, "The number of user threads blocked waiting for buffer memory to enqueue their records"); Measurable waitingThreads = new Measurable() { public double measure(MetricConfig config, long now) { return free.queued(); @@ -120,7 +118,7 @@ public double measure(MetricConfig config, long now) { }; metrics.addMetric(metricName, waitingThreads); - metricName = new MetricName("buffer-total-bytes", metricGrpName, "The maximum amount of buffer memory the client can use (whether or not it is currently used).", metricTags); + metricName = metrics.metricName("buffer-total-bytes", metricGrpName, "The maximum amount of buffer memory the client can use (whether or not it is currently used)."); Measurable totalBytes = new Measurable() { public double measure(MetricConfig config, long now) { return free.totalMemory(); @@ -128,7 +126,7 @@ public double measure(MetricConfig config, long now) { }; metrics.addMetric(metricName, totalBytes); - metricName = new MetricName("buffer-available-bytes", metricGrpName, "The total amount of buffer memory that is not being used (either unallocated or in the free list).", metricTags); + metricName = metrics.metricName("buffer-available-bytes", metricGrpName, "The total amount of buffer memory that is not being used (either unallocated or in the free list)."); Measurable availableBytes = new Measurable() { public double measure(MetricConfig config, long now) { return free.availableMemory(); @@ -137,7 +135,7 @@ public double measure(MetricConfig config, long now) { metrics.addMetric(metricName, availableBytes); Sensor bufferExhaustedRecordSensor = metrics.sensor("buffer-exhausted-records"); - metricName = new MetricName("buffer-exhausted-rate", metricGrpName, "The average per-second number of record sends that are dropped due to buffer exhaustion", metricTags); + metricName = metrics.metricName("buffer-exhausted-rate", metricGrpName, "The average per-second number of record sends that are dropped due to buffer exhaustion"); bufferExhaustedRecordSensor.add(metricName, new Rate()); } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index cada6266d57e0..b8215e1895da3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -367,65 +367,63 @@ private class SenderMetrics { public SenderMetrics(Metrics metrics) { this.metrics = metrics; - Map metricTags = new LinkedHashMap(); - metricTags.put("client-id", clientId); String metricGrpName = "producer-metrics"; this.batchSizeSensor = metrics.sensor("batch-size"); - MetricName m = new MetricName("batch-size-avg", metricGrpName, "The average number of bytes sent per partition per-request.", metricTags); + MetricName m = metrics.metricName("batch-size-avg", metricGrpName, "The average number of bytes sent per partition per-request."); this.batchSizeSensor.add(m, new Avg()); - m = new MetricName("batch-size-max", metricGrpName, "The max number of bytes sent per partition per-request.", metricTags); + m = metrics.metricName("batch-size-max", metricGrpName, "The max number of bytes sent per partition per-request."); this.batchSizeSensor.add(m, new Max()); this.compressionRateSensor = metrics.sensor("compression-rate"); - m = new MetricName("compression-rate-avg", metricGrpName, "The average compression rate of record batches.", metricTags); + m = metrics.metricName("compression-rate-avg", metricGrpName, "The average compression rate of record batches."); this.compressionRateSensor.add(m, new Avg()); this.queueTimeSensor = metrics.sensor("queue-time"); - m = new MetricName("record-queue-time-avg", metricGrpName, "The average time in ms record batches spent in the record accumulator.", metricTags); + m = metrics.metricName("record-queue-time-avg", metricGrpName, "The average time in ms record batches spent in the record accumulator."); this.queueTimeSensor.add(m, new Avg()); - m = new MetricName("record-queue-time-max", metricGrpName, "The maximum time in ms record batches spent in the record accumulator.", metricTags); + m = metrics.metricName("record-queue-time-max", metricGrpName, "The maximum time in ms record batches spent in the record accumulator."); this.queueTimeSensor.add(m, new Max()); this.requestTimeSensor = metrics.sensor("request-time"); - m = new MetricName("request-latency-avg", metricGrpName, "The average request latency in ms", metricTags); + m = metrics.metricName("request-latency-avg", metricGrpName, "The average request latency in ms"); this.requestTimeSensor.add(m, new Avg()); - m = new MetricName("request-latency-max", metricGrpName, "The maximum request latency in ms", metricTags); + m = metrics.metricName("request-latency-max", metricGrpName, "The maximum request latency in ms"); this.requestTimeSensor.add(m, new Max()); this.produceThrottleTimeSensor = metrics.sensor("produce-throttle-time"); - m = new MetricName("produce-throttle-time-avg", metricGrpName, "The average throttle time in ms", metricTags); + m = metrics.metricName("produce-throttle-time-avg", metricGrpName, "The average throttle time in ms"); this.produceThrottleTimeSensor.add(m, new Avg()); - m = new MetricName("produce-throttle-time-max", metricGrpName, "The maximum throttle time in ms", metricTags); + m = metrics.metricName("produce-throttle-time-max", metricGrpName, "The maximum throttle time in ms"); this.produceThrottleTimeSensor.add(m, new Max()); this.recordsPerRequestSensor = metrics.sensor("records-per-request"); - m = new MetricName("record-send-rate", metricGrpName, "The average number of records sent per second.", metricTags); + m = metrics.metricName("record-send-rate", metricGrpName, "The average number of records sent per second."); this.recordsPerRequestSensor.add(m, new Rate()); - m = new MetricName("records-per-request-avg", metricGrpName, "The average number of records per request.", metricTags); + m = metrics.metricName("records-per-request-avg", metricGrpName, "The average number of records per request."); this.recordsPerRequestSensor.add(m, new Avg()); this.retrySensor = metrics.sensor("record-retries"); - m = new MetricName("record-retry-rate", metricGrpName, "The average per-second number of retried record sends", metricTags); + m = metrics.metricName("record-retry-rate", metricGrpName, "The average per-second number of retried record sends"); this.retrySensor.add(m, new Rate()); this.errorSensor = metrics.sensor("errors"); - m = new MetricName("record-error-rate", metricGrpName, "The average per-second number of record sends that resulted in errors", metricTags); + m = metrics.metricName("record-error-rate", metricGrpName, "The average per-second number of record sends that resulted in errors"); this.errorSensor.add(m, new Rate()); this.maxRecordSizeSensor = metrics.sensor("record-size-max"); - m = new MetricName("record-size-max", metricGrpName, "The maximum record size", metricTags); + m = metrics.metricName("record-size-max", metricGrpName, "The maximum record size"); this.maxRecordSizeSensor.add(m, new Max()); - m = new MetricName("record-size-avg", metricGrpName, "The average record size", metricTags); + m = metrics.metricName("record-size-avg", metricGrpName, "The average record size"); this.maxRecordSizeSensor.add(m, new Avg()); - m = new MetricName("requests-in-flight", metricGrpName, "The current number of in-flight requests awaiting a response.", metricTags); + m = metrics.metricName("requests-in-flight", metricGrpName, "The current number of in-flight requests awaiting a response."); this.metrics.addMetric(m, new Measurable() { public double measure(MetricConfig config, long now) { return client.inFlightRequestCount(); } }); - m = new MetricName("metadata-age", metricGrpName, "The age in seconds of the current producer metadata being used.", metricTags); + m = metrics.metricName("metadata-age", metricGrpName, "The age in seconds of the current producer metadata being used."); metrics.addMetric(m, new Measurable() { public double measure(MetricConfig config, long now) { return (now - metadata.lastSuccessfulUpdate()) / 1000.0; @@ -440,32 +438,31 @@ public void maybeRegisterTopicMetrics(String topic) { Sensor topicRecordCount = this.metrics.getSensor(topicRecordsCountName); if (topicRecordCount == null) { Map metricTags = new LinkedHashMap(); - metricTags.put("client-id", clientId); metricTags.put("topic", topic); String metricGrpName = "producer-topic-metrics"; topicRecordCount = this.metrics.sensor(topicRecordsCountName); - MetricName m = new MetricName("record-send-rate", metricGrpName , metricTags); + MetricName m = this.metrics.metricName("record-send-rate", metricGrpName, metricTags); topicRecordCount.add(m, new Rate()); String topicByteRateName = "topic." + topic + ".bytes"; Sensor topicByteRate = this.metrics.sensor(topicByteRateName); - m = new MetricName("byte-rate", metricGrpName , metricTags); + m = this.metrics.metricName("byte-rate", metricGrpName, metricTags); topicByteRate.add(m, new Rate()); String topicCompressionRateName = "topic." + topic + ".compression-rate"; Sensor topicCompressionRate = this.metrics.sensor(topicCompressionRateName); - m = new MetricName("compression-rate", metricGrpName , metricTags); + m = this.metrics.metricName("compression-rate", metricGrpName, metricTags); topicCompressionRate.add(m, new Avg()); String topicRetryName = "topic." + topic + ".record-retries"; Sensor topicRetrySensor = this.metrics.sensor(topicRetryName); - m = new MetricName("record-retry-rate", metricGrpName , metricTags); + m = this.metrics.metricName("record-retry-rate", metricGrpName, metricTags); topicRetrySensor.add(m, new Rate()); String topicErrorName = "topic." + topic + ".record-errors"; Sensor topicErrorSensor = this.metrics.sensor(topicErrorName); - m = new MetricName("record-error-rate", metricGrpName , metricTags); + m = this.metrics.metricName("record-error-rate", metricGrpName, metricTags); topicErrorSensor.add(m, new Rate()); } } diff --git a/clients/src/main/java/org/apache/kafka/common/MetricName.java b/clients/src/main/java/org/apache/kafka/common/MetricName.java index ee50f332d3e77..2b810305910b8 100644 --- a/clients/src/main/java/org/apache/kafka/common/MetricName.java +++ b/clients/src/main/java/org/apache/kafka/common/MetricName.java @@ -18,7 +18,7 @@ import org.apache.kafka.common.utils.Utils; /** - * The MetricName class encapsulates a metric's name, logical group and its related attributes + * The MetricName class encapsulates a metric's name, logical group and its related attributes. It should be constructed using metrics.MetricName(...). *

* This class captures the following parameters *

@@ -31,23 +31,27 @@
  * 

* Ex: standard JMX MBean can be constructed like domainName:type=group,key1=val1,key2=val2 *

+ * * Usage looks something like this: *

{@code
  * // set up metrics:
- * Metrics metrics = new Metrics(); // this is the global repository of metrics and sensors
- * Sensor sensor = metrics.sensor("message-sizes");
  *
  * Map metricTags = new LinkedHashMap();
  * metricTags.put("client-id", "producer-1");
  * metricTags.put("topic", "topic");
  *
- * MetricName metricName = new MetricName("message-size-avg", "producer-metrics", "average message size", metricTags);
+ * MetricConfig metricConfig = new MetricConfig().tags(metricTags);
+ * Metrics metrics = new Metrics(metricConfig); // this is the global repository of metrics and sensors
+ *
+ * Sensor sensor = metrics.sensor("message-sizes");
+ *
+ * MetricName metricName = metrics.metricName("message-size-avg", "producer-metrics", "average message size");
  * sensor.add(metricName, new Avg());
  *
- * metricName = new MetricName("message-size-max", "producer-metrics", metricTags);
+ * metricName = metrics.metricName("message-size-max", "producer-metrics");
  * sensor.add(metricName, new Max());
  *
- * metricName = new MetricName("message-size-min", "producer-metrics", "message minimum size", "client-id", "my-client", "topic", "my-topic");
+ * metricName = metrics.metricName("message-size-min", "producer-metrics", "message minimum size", "client-id", "my-client", "topic", "my-topic");
  * sensor.add(metricName, new Min());
  *
  * // as messages are sent we record the sizes
@@ -63,6 +67,8 @@ public final class MetricName {
     private int hash = 0;
 
     /**
+     * Please create MetricName by method {@link org.apache.kafka.common.metrics.Metrics#metricName(String, String, String, Map)}
+     *
      * @param name        The name of the metric
      * @param group       logical group name of the metrics to which this metric belongs
      * @param description A human-readable description to include in the metric
@@ -76,11 +82,15 @@ public MetricName(String name, String group, String description, Map getTags(String... keyValue) {
     }
 
     /**
+     * @deprecated This method will be removed in a future release.
+     * Please create MetricName by method {@link org.apache.kafka.common.metrics.Metrics#metricName(String, String, Map)}
+     *
      * @param name  The name of the metric
      * @param group logical group name of the metrics to which this metric belongs
      * @param tags  key/value attributes of the metric
      */
+    @Deprecated
     public MetricName(String name, String group, Map tags) {
         this(name, group, "", tags);
     }
 
     /**
+     * @deprecated This method will be removed in a future release.
+     * Please create MetricName by method {@link org.apache.kafka.common.metrics.Metrics#metricName(String, String, String)}
+     *
      * @param name        The name of the metric
      * @param group       logical group name of the metrics to which this metric belongs
      * @param description A human-readable description to include in the metric
      */
+    @Deprecated
     public MetricName(String name, String group, String description) {
         this(name, group, description, new HashMap());
     }
 
     /**
+     * @deprecated This method will be removed in a future release.
+     * Please create MetricName by method {@link org.apache.kafka.common.metrics.Metrics#metricName(String, String)}
+     *
      * @param name  The name of the metric
      * @param group logical group name of the metrics to which this metric belongs
      */
+    @Deprecated
     public MetricName(String name, String group) {
         this(name, group, "", new HashMap());
     }
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/MetricConfig.java b/clients/src/main/java/org/apache/kafka/common/metrics/MetricConfig.java
index dfa1b0a11042a..6bd351df9a7c3 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/MetricConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/MetricConfig.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.common.metrics;
 
+import java.util.LinkedHashMap;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -28,6 +30,7 @@ public class MetricConfig {
     private long eventWindow;
     private long timeWindowMs;
     private TimeUnit unit;
+    private Map tags;
 
     public MetricConfig() {
         super();
@@ -36,6 +39,7 @@ public MetricConfig() {
         this.eventWindow = Long.MAX_VALUE;
         this.timeWindowMs = TimeUnit.MILLISECONDS.convert(30, TimeUnit.SECONDS);
         this.unit = TimeUnit.SECONDS;
+        this.tags = new LinkedHashMap<>();
     }
 
     public Quota quota() {
@@ -65,6 +69,15 @@ public MetricConfig timeWindow(long window, TimeUnit unit) {
         return this;
     }
 
+    public Map tags() {
+        return this.tags;
+    }
+
+    public MetricConfig tags(Map tags) {
+        this.tags = tags;
+        return this;
+    }
+
     public int samples() {
         return this.samples;
     }
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
index fdb7dacaa5f1a..842e0f7a98342 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
@@ -15,6 +15,8 @@
 import java.io.Closeable;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -81,6 +83,15 @@ public Metrics(Time time) {
     }
 
     /**
+     * Create a metrics repository with no metric reporters and the given default configuration.
+     * Expiration of Sensors is disabled.
+     */
+    public Metrics(MetricConfig defaultConfig, Time time) {
+      this(defaultConfig, new ArrayList(0), time);
+    }
+
+
+  /**
      * Create a metrics repository with no reporters and the given default config. This config will be used for any
      * metric that doesn't override its own config. Expiration of Sensors is disabled.
      * @param defaultConfig The default config to use for all metrics that don't override their config
@@ -130,6 +141,90 @@ public Thread newThread(Runnable runnable) {
         } else {
             this.metricsScheduler = null;
         }
+
+        addMetric(metricName("count", "kafka-metrics-count", "total number of registered metrics"),
+            new Measurable() {
+                @Override
+                public double measure(MetricConfig config, long now) {
+                    return metrics.size();
+                }
+            });
+    }
+
+    /**
+     * Create a MetricName with the given name, group, description and tags, plus default tags specified in the metric
+     * configuration. Tag in tags takes precedence if the same tag key is specified in the default metric configuration.
+     *
+     * @param name        The name of the metric
+     * @param group       logical group name of the metrics to which this metric belongs
+     * @param description A human-readable description to include in the metric
+     * @param tags        additional key/value attributes of the metric
+     */
+    public MetricName metricName(String name, String group, String description, Map tags) {
+        Map combinedTag = new LinkedHashMap<>(config.tags());
+        combinedTag.putAll(tags);
+        return new MetricName(name, group, description, combinedTag);
+    }
+
+    /**
+     * Create a MetricName with the given name, group, description, and default tags
+     * specified in the metric configuration.
+     *
+     * @param name        The name of the metric
+     * @param group       logical group name of the metrics to which this metric belongs
+     * @param description A human-readable description to include in the metric
+     */
+    public MetricName metricName(String name, String group, String description) {
+        return metricName(name, group, description, new HashMap());
+    }
+
+    /**
+     * Create a MetricName with the given name, group and default tags specified in the metric configuration.
+     *
+     * @param name        The name of the metric
+     * @param group       logical group name of the metrics to which this metric belongs
+     */
+    public MetricName metricName(String name, String group) {
+        return metricName(name, group, "", new HashMap());
+    }
+
+    /**
+     * Create a MetricName with the given name, group, description, and keyValue as tags,  plus default tags specified in the metric
+     * configuration. Tag in keyValue takes precedence if the same tag key is specified in the default metric configuration.
+     *
+     * @param name          The name of the metric
+     * @param group         logical group name of the metrics to which this metric belongs
+     * @param description   A human-readable description to include in the metric
+     * @param keyValue      additional key/value attributes of the metric (must come in pairs)
+     */
+    public MetricName metricName(String name, String group, String description, String... keyValue) {
+        return metricName(name, group, description, getTags(keyValue));
+    }
+
+    /**
+     * Create a MetricName with the given name, group and tags, plus default tags specified in the metric
+     * configuration. Tag in tags takes precedence if the same tag key is specified in the default metric configuration.
+     *
+     * @param name  The name of the metric
+     * @param group logical group name of the metrics to which this metric belongs
+     * @param tags  key/value attributes of the metric
+     */
+    public MetricName metricName(String name, String group, Map tags) {
+        return metricName(name, group, "", tags);
+    }
+
+    private static Map getTags(String... keyValue) {
+        if ((keyValue.length % 2) != 0)
+            throw new IllegalArgumentException("keyValue needs to be specified in pairs");
+        Map tags = new HashMap();
+
+        for (int i = 0; i < keyValue.length; i += 2)
+            tags.put(keyValue[i], keyValue[i + 1]);
+        return tags;
+    }
+
+    public MetricConfig config() {
+        return config;
     }
 
     /**
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 639a2bebc2fb0..387c063d9da2d 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -118,8 +118,8 @@ public Selector(int maxReceiveSize, long connectionMaxIdleMs, Metrics metrics, T
         this.metricsPerConnection = metricsPerConnection;
     }
 
-    public Selector(long connectionMaxIdleMS, Metrics metrics, Time time, String metricGrpPrefix, Map metricTags, ChannelBuilder channelBuilder) {
-        this(NetworkReceive.UNLIMITED, connectionMaxIdleMS, metrics, time, metricGrpPrefix, metricTags, true, channelBuilder);
+    public Selector(long connectionMaxIdleMS, Metrics metrics, Time time, String metricGrpPrefix, ChannelBuilder channelBuilder) {
+        this(NetworkReceive.UNLIMITED, connectionMaxIdleMS, metrics, time, metricGrpPrefix, new HashMap(), true, channelBuilder);
     }
 
     /**
@@ -568,48 +568,48 @@ public SelectorMetrics(Metrics metrics) {
             }
 
             this.connectionClosed = sensor("connections-closed:" + tagsSuffix.toString());
-            MetricName metricName = new MetricName("connection-close-rate", metricGrpName, "Connections closed per second in the window.", metricTags);
+            MetricName metricName = metrics.metricName("connection-close-rate", metricGrpName, "Connections closed per second in the window.", metricTags);
             this.connectionClosed.add(metricName, new Rate());
 
             this.connectionCreated = sensor("connections-created:" + tagsSuffix.toString());
-            metricName = new MetricName("connection-creation-rate", metricGrpName, "New connections established per second in the window.", metricTags);
+            metricName = metrics.metricName("connection-creation-rate", metricGrpName, "New connections established per second in the window.", metricTags);
             this.connectionCreated.add(metricName, new Rate());
 
             this.bytesTransferred = sensor("bytes-sent-received:" + tagsSuffix.toString());
-            metricName = new MetricName("network-io-rate", metricGrpName, "The average number of network operations (reads or writes) on all connections per second.", metricTags);
+            metricName = metrics.metricName("network-io-rate", metricGrpName, "The average number of network operations (reads or writes) on all connections per second.", metricTags);
             bytesTransferred.add(metricName, new Rate(new Count()));
 
             this.bytesSent = sensor("bytes-sent:" + tagsSuffix.toString(), bytesTransferred);
-            metricName = new MetricName("outgoing-byte-rate", metricGrpName, "The average number of outgoing bytes sent per second to all servers.", metricTags);
+            metricName = metrics.metricName("outgoing-byte-rate", metricGrpName, "The average number of outgoing bytes sent per second to all servers.", metricTags);
             this.bytesSent.add(metricName, new Rate());
-            metricName = new MetricName("request-rate", metricGrpName, "The average number of requests sent per second.", metricTags);
+            metricName = metrics.metricName("request-rate", metricGrpName, "The average number of requests sent per second.", metricTags);
             this.bytesSent.add(metricName, new Rate(new Count()));
-            metricName = new MetricName("request-size-avg", metricGrpName, "The average size of all requests in the window..", metricTags);
+            metricName = metrics.metricName("request-size-avg", metricGrpName, "The average size of all requests in the window..", metricTags);
             this.bytesSent.add(metricName, new Avg());
-            metricName = new MetricName("request-size-max", metricGrpName, "The maximum size of any request sent in the window.", metricTags);
+            metricName = metrics.metricName("request-size-max", metricGrpName, "The maximum size of any request sent in the window.", metricTags);
             this.bytesSent.add(metricName, new Max());
 
             this.bytesReceived = sensor("bytes-received:" + tagsSuffix.toString(), bytesTransferred);
-            metricName = new MetricName("incoming-byte-rate", metricGrpName, "Bytes/second read off all sockets", metricTags);
+            metricName = metrics.metricName("incoming-byte-rate", metricGrpName, "Bytes/second read off all sockets", metricTags);
             this.bytesReceived.add(metricName, new Rate());
-            metricName = new MetricName("response-rate", metricGrpName, "Responses received sent per second.", metricTags);
+            metricName = metrics.metricName("response-rate", metricGrpName, "Responses received sent per second.", metricTags);
             this.bytesReceived.add(metricName, new Rate(new Count()));
 
             this.selectTime = sensor("select-time:" + tagsSuffix.toString());
-            metricName = new MetricName("select-rate", metricGrpName, "Number of times the I/O layer checked for new I/O to perform per second", metricTags);
+            metricName = metrics.metricName("select-rate", metricGrpName, "Number of times the I/O layer checked for new I/O to perform per second", metricTags);
             this.selectTime.add(metricName, new Rate(new Count()));
-            metricName = new MetricName("io-wait-time-ns-avg", metricGrpName, "The average length of time the I/O thread spent waiting for a socket ready for reads or writes in nanoseconds.", metricTags);
+            metricName = metrics.metricName("io-wait-time-ns-avg", metricGrpName, "The average length of time the I/O thread spent waiting for a socket ready for reads or writes in nanoseconds.", metricTags);
             this.selectTime.add(metricName, new Avg());
-            metricName = new MetricName("io-wait-ratio", metricGrpName, "The fraction of time the I/O thread spent waiting.", metricTags);
+            metricName = metrics.metricName("io-wait-ratio", metricGrpName, "The fraction of time the I/O thread spent waiting.", metricTags);
             this.selectTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));
 
             this.ioTime = sensor("io-time:" + tagsSuffix.toString());
-            metricName = new MetricName("io-time-ns-avg", metricGrpName, "The average length of time for I/O per select call in nanoseconds.", metricTags);
+            metricName = metrics.metricName("io-time-ns-avg", metricGrpName, "The average length of time for I/O per select call in nanoseconds.", metricTags);
             this.ioTime.add(metricName, new Avg());
-            metricName = new MetricName("io-ratio", metricGrpName, "The fraction of time the I/O thread spent doing I/O", metricTags);
+            metricName = metrics.metricName("io-ratio", metricGrpName, "The fraction of time the I/O thread spent doing I/O", metricTags);
             this.ioTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));
 
-            metricName = new MetricName("connection-count", metricGrpName, "The current number of active connections.", metricTags);
+            metricName = metrics.metricName("connection-count", metricGrpName, "The current number of active connections.", metricTags);
             topLevelMetricNames.add(metricName);
             this.metrics.addMetric(metricName, new Measurable() {
                 public double measure(MetricConfig config, long now) {
@@ -637,27 +637,27 @@ public void maybeRegisterConnectionMetrics(String connectionId) {
                     tags.put("node-id", "node-" + connectionId);
 
                     nodeRequest = sensor(nodeRequestName);
-                    MetricName metricName = new MetricName("outgoing-byte-rate", metricGrpName, tags);
+                    MetricName metricName = metrics.metricName("outgoing-byte-rate", metricGrpName, tags);
                     nodeRequest.add(metricName, new Rate());
-                    metricName = new MetricName("request-rate", metricGrpName, "The average number of requests sent per second.", tags);
+                    metricName = metrics.metricName("request-rate", metricGrpName, "The average number of requests sent per second.", tags);
                     nodeRequest.add(metricName, new Rate(new Count()));
-                    metricName = new MetricName("request-size-avg", metricGrpName, "The average size of all requests in the window..", tags);
+                    metricName = metrics.metricName("request-size-avg", metricGrpName, "The average size of all requests in the window..", tags);
                     nodeRequest.add(metricName, new Avg());
-                    metricName = new MetricName("request-size-max", metricGrpName, "The maximum size of any request sent in the window.", tags);
+                    metricName = metrics.metricName("request-size-max", metricGrpName, "The maximum size of any request sent in the window.", tags);
                     nodeRequest.add(metricName, new Max());
 
                     String nodeResponseName = "node-" + connectionId + ".bytes-received";
                     Sensor nodeResponse = sensor(nodeResponseName);
-                    metricName = new MetricName("incoming-byte-rate", metricGrpName, tags);
+                    metricName = metrics.metricName("incoming-byte-rate", metricGrpName, tags);
                     nodeResponse.add(metricName, new Rate());
-                    metricName = new MetricName("response-rate", metricGrpName, "The average number of responses received per second.", tags);
+                    metricName = metrics.metricName("response-rate", metricGrpName, "The average number of responses received per second.", tags);
                     nodeResponse.add(metricName, new Rate(new Count()));
 
                     String nodeTimeName = "node-" + connectionId + ".latency";
                     Sensor nodeRequestTime = sensor(nodeTimeName);
-                    metricName = new MetricName("request-latency-avg", metricGrpName, tags);
+                    metricName = metrics.metricName("request-latency-avg", metricGrpName, tags);
                     nodeRequestTime.add(metricName, new Avg());
-                    metricName = new MetricName("request-latency-max", metricGrpName, tags);
+                    metricName = metrics.metricName("request-latency-max", metricGrpName, tags);
                     nodeRequestTime.add(metricName, new Max());
                 }
             }
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 9f9682a74d7d5..3ae1a362b2645 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -60,7 +60,6 @@
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -89,7 +88,6 @@ public class ConsumerCoordinatorTest {
     private SubscriptionState subscriptions;
     private Metadata metadata;
     private Metrics metrics;
-    private Map metricTags = new LinkedHashMap<>();
     private ConsumerNetworkClient consumerClient;
     private MockRebalanceListener rebalanceListener;
     private MockCommitCallback defaultOffsetCommitCallback;
@@ -109,7 +107,6 @@ public void setup() {
         this.partitionAssignor.clear();
 
         client.setNode(node);
-
         this.coordinator = buildCoordinator(metrics, assignors);
     }
 
@@ -912,7 +909,6 @@ private ConsumerCoordinator buildCoordinator(Metrics metrics, List metricTags = new LinkedHashMap();
     private static final double EPSILON = 0.0001;
     private ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(client, metadata, time, 100);
 
@@ -484,8 +482,8 @@ public void testQuotaMetrics() throws Exception {
         }
 
         Map allMetrics = metrics.metrics();
-        KafkaMetric avgMetric = allMetrics.get(new MetricName("fetch-throttle-time-avg", metricGroup, "", metricTags));
-        KafkaMetric maxMetric = allMetrics.get(new MetricName("fetch-throttle-time-max", metricGroup, "", metricTags));
+        KafkaMetric avgMetric = allMetrics.get(metrics.metricName("fetch-throttle-time-avg", metricGroup, ""));
+        KafkaMetric maxMetric = allMetrics.get(metrics.metricName("fetch-throttle-time-max", metricGroup, ""));
         assertEquals(200, avgMetric.value(), EPSILON);
         assertEquals(300, maxMetric.value(), EPSILON);
     }
@@ -527,7 +525,6 @@ private  Fetcher createFetcher(SubscriptionState subscriptions,
                 subscriptions,
                 metrics,
                 "consumer" + groupId,
-                metricTags,
                 time,
                 retryBackoffMs);
     }
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
index f8567e9f3a0a7..b103bee294d5d 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
@@ -25,9 +25,7 @@
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.LinkedHashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -38,7 +36,6 @@ public class BufferPoolTest {
     private Metrics metrics = new Metrics(time);
     private final long maxBlockTimeMs =  2000;
     String metricGroup = "TestMetrics";
-    Map metricTags = new LinkedHashMap();
 
     @After
     public void teardown() {
@@ -52,7 +49,7 @@ public void teardown() {
     public void testSimple() throws Exception {
         long totalMemory = 64 * 1024;
         int size = 1024;
-        BufferPool pool = new BufferPool(totalMemory, size, metrics, time, metricGroup, metricTags);
+        BufferPool pool = new BufferPool(totalMemory, size, metrics, time, metricGroup);
         ByteBuffer buffer = pool.allocate(size, maxBlockTimeMs);
         assertEquals("Buffer size should equal requested size.", size, buffer.limit());
         assertEquals("Unallocated memory should have shrunk", totalMemory - size, pool.unallocatedMemory());
@@ -79,7 +76,7 @@ public void testSimple() throws Exception {
      */
     @Test(expected = IllegalArgumentException.class)
     public void testCantAllocateMoreMemoryThanWeHave() throws Exception {
-        BufferPool pool = new BufferPool(1024, 512, metrics, time, metricGroup, metricTags);
+        BufferPool pool = new BufferPool(1024, 512, metrics, time, metricGroup);
         ByteBuffer buffer = pool.allocate(1024, maxBlockTimeMs);
         assertEquals(1024, buffer.limit());
         pool.deallocate(buffer);
@@ -91,7 +88,7 @@ public void testCantAllocateMoreMemoryThanWeHave() throws Exception {
      */
     @Test
     public void testDelayedAllocation() throws Exception {
-        BufferPool pool = new BufferPool(5 * 1024, 1024, metrics, time, metricGroup, metricTags);
+        BufferPool pool = new BufferPool(5 * 1024, 1024, metrics, time, metricGroup);
         ByteBuffer buffer = pool.allocate(1024, maxBlockTimeMs);
         CountDownLatch doDealloc = asyncDeallocate(pool, buffer);
         CountDownLatch allocation = asyncAllocate(pool, 5 * 1024);
@@ -140,7 +137,7 @@ public void run() {
      */
     @Test
     public void testBlockTimeout() throws Exception {
-        BufferPool pool = new BufferPool(2, 1, metrics, time, metricGroup, metricTags);
+        BufferPool pool = new BufferPool(2, 1, metrics, time, metricGroup);
         pool.allocate(1, maxBlockTimeMs);
         try {
             pool.allocate(2, maxBlockTimeMs);
@@ -159,7 +156,7 @@ public void testStressfulSituation() throws Exception {
         final int iterations = 50000;
         final int poolableSize = 1024;
         final long totalMemory = numThreads / 2 * poolableSize;
-        final BufferPool pool = new BufferPool(totalMemory, poolableSize, metrics, time, metricGroup, metricTags);
+        final BufferPool pool = new BufferPool(totalMemory, poolableSize, metrics, time, metricGroup);
         List threads = new ArrayList();
         for (int i = 0; i < numThreads; i++)
             threads.add(new StressTestThread(pool, iterations));
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index 4674a9140e71f..723e450660729 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -22,7 +22,6 @@
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -65,7 +64,6 @@ public class RecordAccumulatorTest {
     private int msgSize = Records.LOG_OVERHEAD + Record.recordSize(key, value);
     private Cluster cluster = new Cluster(Arrays.asList(node1, node2), Arrays.asList(part1, part2, part3), Collections.emptySet());
     private Metrics metrics = new Metrics(time);
-    Map metricTags = new LinkedHashMap();
     private final long maxBlockTimeMs = 1000;
 
     @After
@@ -76,7 +74,7 @@ public void teardown() {
     @Test
     public void testFull() throws Exception {
         long now = time.milliseconds();
-        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 10L, 100L, metrics, time,  metricTags);
+        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 10L, 100L, metrics, time);
         int appends = 1024 / msgSize;
         for (int i = 0; i < appends; i++) {
             accum.append(tp1, key, value, null, maxBlockTimeMs);
@@ -100,7 +98,7 @@ public void testFull() throws Exception {
     @Test
     public void testAppendLarge() throws Exception {
         int batchSize = 512;
-        RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, CompressionType.NONE, 0L, 100L, metrics, time, metricTags);
+        RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, CompressionType.NONE, 0L, 100L, metrics, time);
         accum.append(tp1, key, new byte[2 * batchSize], null, maxBlockTimeMs);
         assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes);
     }
@@ -108,7 +106,7 @@ public void testAppendLarge() throws Exception {
     @Test
     public void testLinger() throws Exception {
         long lingerMs = 10L;
-        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, 100L, metrics, time, metricTags);
+        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, 100L, metrics, time);
         accum.append(tp1, key, value, null, maxBlockTimeMs);
         assertEquals("No partitions should be ready", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size());
         time.sleep(10);
@@ -126,7 +124,7 @@ public void testLinger() throws Exception {
 
     @Test
     public void testPartialDrain() throws Exception {
-        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 10L, 100L, metrics, time, metricTags);
+        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 10L, 100L, metrics, time);
         int appends = 1024 / msgSize + 1;
         List partitions = asList(tp1, tp2);
         for (TopicPartition tp : partitions) {
@@ -145,7 +143,7 @@ public void testStressfulSituation() throws Exception {
         final int numThreads = 5;
         final int msgs = 10000;
         final int numParts = 2;
-        final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 0L, 100L, metrics, time, metricTags);
+        final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 0L, 100L, metrics, time);
         List threads = new ArrayList();
         for (int i = 0; i < numThreads; i++) {
             threads.add(new Thread() {
@@ -185,7 +183,7 @@ public void run() {
     public void testNextReadyCheckDelay() throws Exception {
         // Next check time will use lingerMs since this test won't trigger any retries/backoff
         long lingerMs = 10L;
-        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024,  CompressionType.NONE, lingerMs, 100L, metrics, time, metricTags);
+        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024,  CompressionType.NONE, lingerMs, 100L, metrics, time);
         // Just short of going over the limit so we trigger linger time
         int appends = 1024 / msgSize;
 
@@ -219,7 +217,7 @@ public void testNextReadyCheckDelay() throws Exception {
     public void testRetryBackoff() throws Exception {
         long lingerMs = Long.MAX_VALUE / 4;
         long retryBackoffMs = Long.MAX_VALUE / 2;
-        final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, retryBackoffMs, metrics, time, metricTags);
+        final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, retryBackoffMs, metrics, time);
 
         long now = time.milliseconds();
         accum.append(tp1, key, value, null, maxBlockTimeMs);
@@ -256,7 +254,7 @@ public void testRetryBackoff() throws Exception {
     @Test
     public void testFlush() throws Exception {
         long lingerMs = Long.MAX_VALUE;
-        final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, CompressionType.NONE, lingerMs, 100L, metrics, time, metricTags);
+        final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, CompressionType.NONE, lingerMs, 100L, metrics, time);
         for (int i = 0; i < 100; i++)
             accum.append(new TopicPartition(topic, i % 3), key, value, null, maxBlockTimeMs);
         RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds());
@@ -280,7 +278,7 @@ public void testFlush() throws Exception {
     public void testAbortIncompleteBatches() throws Exception {
         long lingerMs = Long.MAX_VALUE;
         final AtomicInteger numExceptionReceivedInCallback = new AtomicInteger(0);
-        final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, CompressionType.NONE, lingerMs, 100L, metrics, time, metricTags);
+        final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, CompressionType.NONE, lingerMs, 100L, metrics, time);
         class TestCallback implements Callback {
             @Override
             public void onCompletion(RecordMetadata metadata, Exception exception) {
@@ -303,7 +301,7 @@ public void onCompletion(RecordMetadata metadata, Exception exception) {
     public void testExpiredBatches() throws InterruptedException {
         Time time = new SystemTime();
         long now = time.milliseconds();
-        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 10, 100L, metrics, time, metricTags);
+        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 10, 100L, metrics, time);
         int appends = 1024 / msgSize;
         for (int i = 0; i < appends; i++) {
             accum.append(tp1, key, value, null, maxBlockTimeMs);
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index bcc618aad69b7..dc61fc2e2c157 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -29,6 +29,7 @@
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
@@ -57,24 +58,29 @@ public class SenderTest {
     private int batchSize = 16 * 1024;
     private Metadata metadata = new Metadata(0, Long.MAX_VALUE);
     private Cluster cluster = TestUtils.singletonCluster("test", 1);
-    private Metrics metrics = new Metrics(time);
-    Map metricTags = new LinkedHashMap();
-    private RecordAccumulator accumulator = new RecordAccumulator(batchSize, 1024 * 1024, CompressionType.NONE, 0L, 0L, metrics, time, metricTags);
-    private Sender sender = new Sender(client,
-                                       metadata,
-                                       this.accumulator,
-                                       MAX_REQUEST_SIZE,
-                                       ACKS_ALL,
-                                       MAX_RETRIES,
-                                       metrics,
-                                       time,
-                                       CLIENT_ID,
-                                       REQUEST_TIMEOUT);
+    private Metrics metrics = null;
+    private RecordAccumulator accumulator = null;
+    private Sender sender = null;
 
     @Before
     public void setup() {
-        metadata.update(cluster, time.milliseconds());
+        Map metricTags = new LinkedHashMap();
         metricTags.put("client-id", CLIENT_ID);
+        MetricConfig metricConfig = new MetricConfig().tags(metricTags);
+        metrics = new Metrics(metricConfig, time);
+        accumulator = new RecordAccumulator(batchSize, 1024 * 1024, CompressionType.NONE, 0L, 0L, metrics, time);
+        sender = new Sender(client,
+                            metadata,
+                            this.accumulator,
+                            MAX_REQUEST_SIZE,
+                            ACKS_ALL,
+                            MAX_RETRIES,
+                            metrics,
+                            time,
+                            CLIENT_ID,
+                            REQUEST_TIMEOUT);
+
+        metadata.update(cluster, time.milliseconds());
     }
 
     @After
@@ -110,8 +116,8 @@ public void testQuotaMetrics() throws Exception {
             sender.run(time.milliseconds());
         }
         Map allMetrics = metrics.metrics();
-        KafkaMetric avgMetric = allMetrics.get(new MetricName("produce-throttle-time-avg", METRIC_GROUP, "", metricTags));
-        KafkaMetric maxMetric = allMetrics.get(new MetricName("produce-throttle-time-max", METRIC_GROUP, "", metricTags));
+        KafkaMetric avgMetric = allMetrics.get(metrics.metricName("produce-throttle-time-avg", METRIC_GROUP, ""));
+        KafkaMetric maxMetric = allMetrics.get(metrics.metricName("produce-throttle-time-max", METRIC_GROUP, ""));
         assertEquals(200, avgMetric.value(), EPS);
         assertEquals(300, maxMetric.value(), EPS);
     }
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java
index 90cd76f923340..e07e646334ed7 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.common.metrics;
 
-import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Total;
 import org.junit.Test;
@@ -29,11 +28,11 @@ public void testJmxRegistration() throws Exception {
         try {
             metrics.addReporter(new JmxReporter());
             Sensor sensor = metrics.sensor("kafka.requests");
-            sensor.add(new MetricName("pack.bean1.avg", "grp1"), new Avg());
-            sensor.add(new MetricName("pack.bean2.total", "grp2"), new Total());
+            sensor.add(metrics.metricName("pack.bean1.avg", "grp1"), new Avg());
+            sensor.add(metrics.metricName("pack.bean2.total", "grp2"), new Total());
             Sensor sensor2 = metrics.sensor("kafka.blah");
-            sensor2.add(new MetricName("pack.bean1.some", "grp1"), new Total());
-            sensor2.add(new MetricName("pack.bean2.some", "grp1"), new Total());
+            sensor2.add(metrics.metricName("pack.bean1.some", "grp1"), new Total());
+            sensor2.add(metrics.metricName("pack.bean2.some", "grp1"), new Total());
         } finally {
             metrics.close();
         }
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
index d465c98b6eb60..d7723aebada23 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
@@ -59,16 +59,16 @@ public void tearDown() {
 
     @Test
     public void testMetricName() {
-        MetricName n1 = new MetricName("name", "group", "description", "key1", "value1", "key2", "value2");
+        MetricName n1 = metrics.metricName("name", "group", "description", "key1", "value1", "key2", "value2");
         Map tags = new HashMap();
         tags.put("key1", "value1");
         tags.put("key2", "value2");
-        MetricName n2 = new MetricName("name", "group", "description", tags);
+        MetricName n2 = metrics.metricName("name", "group", "description", tags);
         assertEquals("metric names created in two different ways should be equal", n1, n2);
 
         try {
-            new MetricName("name", "group", "description", "key1");
-            fail("Creating MetricName with an old number of keyValue should fail");
+            metrics.metricName("name", "group", "description", "key1");
+            fail("Creating MetricName with an odd number of keyValue should fail");
         } catch (IllegalArgumentException e) {
             // this is expected
         }
@@ -78,20 +78,20 @@ public void testMetricName() {
     public void testSimpleStats() throws Exception {
         ConstantMeasurable measurable = new ConstantMeasurable();
 
-        metrics.addMetric(new MetricName("direct.measurable", "grp1", "The fraction of time an appender waits for space allocation."), measurable);
+        metrics.addMetric(metrics.metricName("direct.measurable", "grp1", "The fraction of time an appender waits for space allocation."), measurable);
         Sensor s = metrics.sensor("test.sensor");
-        s.add(new MetricName("test.avg", "grp1"), new Avg());
-        s.add(new MetricName("test.max", "grp1"), new Max());
-        s.add(new MetricName("test.min", "grp1"), new Min());
-        s.add(new MetricName("test.rate", "grp1"), new Rate(TimeUnit.SECONDS));
-        s.add(new MetricName("test.occurences", "grp1"), new Rate(TimeUnit.SECONDS, new Count()));
-        s.add(new MetricName("test.count", "grp1"), new Count());
+        s.add(metrics.metricName("test.avg", "grp1"), new Avg());
+        s.add(metrics.metricName("test.max", "grp1"), new Max());
+        s.add(metrics.metricName("test.min", "grp1"), new Min());
+        s.add(metrics.metricName("test.rate", "grp1"), new Rate(TimeUnit.SECONDS));
+        s.add(metrics.metricName("test.occurences", "grp1"), new Rate(TimeUnit.SECONDS, new Count()));
+        s.add(metrics.metricName("test.count", "grp1"), new Count());
         s.add(new Percentiles(100, -100, 100, BucketSizing.CONSTANT,
-                             new Percentile(new MetricName("test.median", "grp1"), 50.0),
-                             new Percentile(new MetricName("test.perc99_9", "grp1"), 99.9)));
+                             new Percentile(metrics.metricName("test.median", "grp1"), 50.0),
+                             new Percentile(metrics.metricName("test.perc99_9", "grp1"), 99.9)));
 
         Sensor s2 = metrics.sensor("test.sensor2");
-        s2.add(new MetricName("s2.total", "grp1"), new Total());
+        s2.add(metrics.metricName("s2.total", "grp1"), new Total());
         s2.record(5.0);
 
         int sum = 0;
@@ -103,38 +103,38 @@ public void testSimpleStats() throws Exception {
         // prior to any time passing
         double elapsedSecs = (config.timeWindowMs() * (config.samples() - 1)) / 1000.0;
         assertEquals(String.format("Occurrences(0...%d) = %f", count, count / elapsedSecs), count / elapsedSecs,
-                     metrics.metrics().get(new MetricName("test.occurences", "grp1")).value(), EPS);
+                     metrics.metrics().get(metrics.metricName("test.occurences", "grp1")).value(), EPS);
 
         // pretend 2 seconds passed...
         long sleepTimeMs = 2;
         time.sleep(sleepTimeMs * 1000);
         elapsedSecs += sleepTimeMs;
 
-        assertEquals("s2 reflects the constant value", 5.0, metrics.metrics().get(new MetricName("s2.total", "grp1")).value(), EPS);
-        assertEquals("Avg(0...9) = 4.5", 4.5, metrics.metrics().get(new MetricName("test.avg", "grp1")).value(), EPS);
-        assertEquals("Max(0...9) = 9", count - 1, metrics.metrics().get(new MetricName("test.max", "grp1")).value(), EPS);
-        assertEquals("Min(0...9) = 0", 0.0, metrics.metrics().get(new MetricName("test.min", "grp1")).value(), EPS);
+        assertEquals("s2 reflects the constant value", 5.0, metrics.metrics().get(metrics.metricName("s2.total", "grp1")).value(), EPS);
+        assertEquals("Avg(0...9) = 4.5", 4.5, metrics.metrics().get(metrics.metricName("test.avg", "grp1")).value(), EPS);
+        assertEquals("Max(0...9) = 9", count - 1, metrics.metrics().get(metrics.metricName("test.max", "grp1")).value(), EPS);
+        assertEquals("Min(0...9) = 0", 0.0, metrics.metrics().get(metrics.metricName("test.min", "grp1")).value(), EPS);
         assertEquals("Rate(0...9) = 1.40625",
-                     sum / elapsedSecs, metrics.metrics().get(new MetricName("test.rate", "grp1")).value(), EPS);
+                     sum / elapsedSecs, metrics.metrics().get(metrics.metricName("test.rate", "grp1")).value(), EPS);
         assertEquals(String.format("Occurrences(0...%d) = %f", count, count / elapsedSecs),
                      count / elapsedSecs,
-                     metrics.metrics().get(new MetricName("test.occurences", "grp1")).value(), EPS);
+                     metrics.metrics().get(metrics.metricName("test.occurences", "grp1")).value(), EPS);
         assertEquals("Count(0...9) = 10",
-                     (double) count, metrics.metrics().get(new MetricName("test.count", "grp1")).value(), EPS);
+                     (double) count, metrics.metrics().get(metrics.metricName("test.count", "grp1")).value(), EPS);
     }
 
     @Test
     public void testHierarchicalSensors() {
         Sensor parent1 = metrics.sensor("test.parent1");
-        parent1.add(new MetricName("test.parent1.count", "grp1"), new Count());
+        parent1.add(metrics.metricName("test.parent1.count", "grp1"), new Count());
         Sensor parent2 = metrics.sensor("test.parent2");
-        parent2.add(new MetricName("test.parent2.count", "grp1"), new Count());
+        parent2.add(metrics.metricName("test.parent2.count", "grp1"), new Count());
         Sensor child1 = metrics.sensor("test.child1", parent1, parent2);
-        child1.add(new MetricName("test.child1.count", "grp1"), new Count());
+        child1.add(metrics.metricName("test.child1.count", "grp1"), new Count());
         Sensor child2 = metrics.sensor("test.child2", parent1);
-        child2.add(new MetricName("test.child2.count", "grp1"), new Count());
+        child2.add(metrics.metricName("test.child2.count", "grp1"), new Count());
         Sensor grandchild = metrics.sensor("test.grandchild", child1);
-        grandchild.add(new MetricName("test.grandchild.count", "grp1"), new Count());
+        grandchild.add(metrics.metricName("test.grandchild.count", "grp1"), new Count());
 
         /* increment each sensor one time */
         parent1.record();
@@ -167,75 +167,76 @@ public void testBadSensorHierarchy() {
 
     @Test
     public void testRemoveSensor() {
+        int size = metrics.metrics().size();
         Sensor parent1 = metrics.sensor("test.parent1");
-        parent1.add(new MetricName("test.parent1.count", "grp1"), new Count());
+        parent1.add(metrics.metricName("test.parent1.count", "grp1"), new Count());
         Sensor parent2 = metrics.sensor("test.parent2");
-        parent2.add(new MetricName("test.parent2.count", "grp1"), new Count());
+        parent2.add(metrics.metricName("test.parent2.count", "grp1"), new Count());
         Sensor child1 = metrics.sensor("test.child1", parent1, parent2);
-        child1.add(new MetricName("test.child1.count", "grp1"), new Count());
+        child1.add(metrics.metricName("test.child1.count", "grp1"), new Count());
         Sensor child2 = metrics.sensor("test.child2", parent2);
-        child2.add(new MetricName("test.child2.count", "grp1"), new Count());
+        child2.add(metrics.metricName("test.child2.count", "grp1"), new Count());
         Sensor grandChild1 = metrics.sensor("test.gchild2", child2);
-        grandChild1.add(new MetricName("test.gchild2.count", "grp1"), new Count());
+        grandChild1.add(metrics.metricName("test.gchild2.count", "grp1"), new Count());
 
         Sensor sensor = metrics.getSensor("test.parent1");
         assertNotNull(sensor);
         metrics.removeSensor("test.parent1");
         assertNull(metrics.getSensor("test.parent1"));
-        assertNull(metrics.metrics().get(new MetricName("test.parent1.count", "grp1")));
+        assertNull(metrics.metrics().get(metrics.metricName("test.parent1.count", "grp1")));
         assertNull(metrics.getSensor("test.child1"));
         assertNull(metrics.childrenSensors().get(sensor));
-        assertNull(metrics.metrics().get(new MetricName("test.child1.count", "grp1")));
+        assertNull(metrics.metrics().get(metrics.metricName("test.child1.count", "grp1")));
 
         sensor = metrics.getSensor("test.gchild2");
         assertNotNull(sensor);
         metrics.removeSensor("test.gchild2");
         assertNull(metrics.getSensor("test.gchild2"));
         assertNull(metrics.childrenSensors().get(sensor));
-        assertNull(metrics.metrics().get(new MetricName("test.gchild2.count", "grp1")));
+        assertNull(metrics.metrics().get(metrics.metricName("test.gchild2.count", "grp1")));
 
         sensor = metrics.getSensor("test.child2");
         assertNotNull(sensor);
         metrics.removeSensor("test.child2");
         assertNull(metrics.getSensor("test.child2"));
         assertNull(metrics.childrenSensors().get(sensor));
-        assertNull(metrics.metrics().get(new MetricName("test.child2.count", "grp1")));
+        assertNull(metrics.metrics().get(metrics.metricName("test.child2.count", "grp1")));
 
         sensor = metrics.getSensor("test.parent2");
         assertNotNull(sensor);
         metrics.removeSensor("test.parent2");
         assertNull(metrics.getSensor("test.parent2"));
         assertNull(metrics.childrenSensors().get(sensor));
-        assertNull(metrics.metrics().get(new MetricName("test.parent2.count", "grp1")));
+        assertNull(metrics.metrics().get(metrics.metricName("test.parent2.count", "grp1")));
 
-        assertEquals(0, metrics.metrics().size());
+        assertEquals(size, metrics.metrics().size());
     }
 
     @Test
     public void testRemoveInactiveMetrics() {
         Sensor s1 = metrics.sensor("test.s1", null, 1);
-        s1.add(new MetricName("test.s1.count", "grp1"), new Count());
+        s1.add(metrics.metricName("test.s1.count", "grp1"), new Count());
 
         Sensor s2 = metrics.sensor("test.s2", null, 3);
-        s2.add(new MetricName("test.s2.count", "grp1"), new Count());
+        s2.add(metrics.metricName("test.s2.count", "grp1"), new Count());
 
         Metrics.ExpireSensorTask purger = metrics.new ExpireSensorTask();
         purger.run();
         assertNotNull("Sensor test.s1 must be present", metrics.getSensor("test.s1"));
         assertNotNull("MetricName test.s1.count must be present",
-                metrics.metrics().get(new MetricName("test.s1.count", "grp1")));
+                metrics.metrics().get(metrics.metricName("test.s1.count", "grp1")));
         assertNotNull("Sensor test.s2 must be present", metrics.getSensor("test.s2"));
         assertNotNull("MetricName test.s2.count must be present",
-                metrics.metrics().get(new MetricName("test.s2.count", "grp1")));
+                metrics.metrics().get(metrics.metricName("test.s2.count", "grp1")));
 
         time.sleep(1001);
         purger.run();
         assertNull("Sensor test.s1 should have been purged", metrics.getSensor("test.s1"));
         assertNull("MetricName test.s1.count should have been purged",
-                metrics.metrics().get(new MetricName("test.s1.count", "grp1")));
+                metrics.metrics().get(metrics.metricName("test.s1.count", "grp1")));
         assertNotNull("Sensor test.s2 must be present", metrics.getSensor("test.s2"));
         assertNotNull("MetricName test.s2.count must be present",
-                metrics.metrics().get(new MetricName("test.s2.count", "grp1")));
+                metrics.metrics().get(metrics.metricName("test.s2.count", "grp1")));
 
         // record a value in sensor s2. This should reset the clock for that sensor.
         // It should not get purged at the 3 second mark after creation
@@ -244,36 +245,37 @@ public void testRemoveInactiveMetrics() {
         purger.run();
         assertNotNull("Sensor test.s2 must be present", metrics.getSensor("test.s2"));
         assertNotNull("MetricName test.s2.count must be present",
-                metrics.metrics().get(new MetricName("test.s2.count", "grp1")));
+                metrics.metrics().get(metrics.metricName("test.s2.count", "grp1")));
 
         // After another 1 second sleep, the metric should be purged
         time.sleep(1000);
         purger.run();
         assertNull("Sensor test.s2 should have been purged", metrics.getSensor("test.s1"));
         assertNull("MetricName test.s2.count should have been purged",
-                metrics.metrics().get(new MetricName("test.s1.count", "grp1")));
+                metrics.metrics().get(metrics.metricName("test.s1.count", "grp1")));
 
         // After purging, it should be possible to recreate a metric
         s1 = metrics.sensor("test.s1", null, 1);
-        s1.add(new MetricName("test.s1.count", "grp1"), new Count());
+        s1.add(metrics.metricName("test.s1.count", "grp1"), new Count());
         assertNotNull("Sensor test.s1 must be present", metrics.getSensor("test.s1"));
         assertNotNull("MetricName test.s1.count must be present",
-                metrics.metrics().get(new MetricName("test.s1.count", "grp1")));
+                metrics.metrics().get(metrics.metricName("test.s1.count", "grp1")));
     }
 
     @Test
     public void testRemoveMetric() {
-        metrics.addMetric(new MetricName("test1", "grp1"), new Count());
-        metrics.addMetric(new MetricName("test2", "grp1"), new Count());
+        int size = metrics.metrics().size();
+        metrics.addMetric(metrics.metricName("test1", "grp1"), new Count());
+        metrics.addMetric(metrics.metricName("test2", "grp1"), new Count());
 
-        assertNotNull(metrics.removeMetric(new MetricName("test1", "grp1")));
-        assertNull(metrics.metrics().get(new MetricName("test1", "grp1")));
-        assertNotNull(metrics.metrics().get(new MetricName("test2", "grp1")));
+        assertNotNull(metrics.removeMetric(metrics.metricName("test1", "grp1")));
+        assertNull(metrics.metrics().get(metrics.metricName("test1", "grp1")));
+        assertNotNull(metrics.metrics().get(metrics.metricName("test2", "grp1")));
 
-        assertNotNull(metrics.removeMetric(new MetricName("test2", "grp1")));
-        assertNull(metrics.metrics().get(new MetricName("test2", "grp1")));
+        assertNotNull(metrics.removeMetric(metrics.metricName("test2", "grp1")));
+        assertNull(metrics.metrics().get(metrics.metricName("test2", "grp1")));
 
-        assertEquals(0, metrics.metrics().size());
+        assertEquals(size, metrics.metrics().size());
     }
 
     @Test
@@ -313,15 +315,15 @@ public void testOldDataHasNoEffect() {
 
     @Test(expected = IllegalArgumentException.class)
     public void testDuplicateMetricName() {
-        metrics.sensor("test").add(new MetricName("test", "grp1"), new Avg());
-        metrics.sensor("test2").add(new MetricName("test", "grp1"), new Total());
+        metrics.sensor("test").add(metrics.metricName("test", "grp1"), new Avg());
+        metrics.sensor("test2").add(metrics.metricName("test", "grp1"), new Total());
     }
 
     @Test
     public void testQuotas() {
         Sensor sensor = metrics.sensor("test");
-        sensor.add(new MetricName("test1.total", "grp1"), new Total(), new MetricConfig().quota(Quota.upperBound(5.0)));
-        sensor.add(new MetricName("test2.total", "grp1"), new Total(), new MetricConfig().quota(Quota.lowerBound(0.0)));
+        sensor.add(metrics.metricName("test1.total", "grp1"), new Total(), new MetricConfig().quota(Quota.upperBound(5.0)));
+        sensor.add(metrics.metricName("test2.total", "grp1"), new Total(), new MetricConfig().quota(Quota.lowerBound(0.0)));
         sensor.record(5.0);
         try {
             sensor.record(1.0);
@@ -329,7 +331,7 @@ public void testQuotas() {
         } catch (QuotaViolationException e) {
             // this is good
         }
-        assertEquals(6.0, metrics.metrics().get(new MetricName("test1.total", "grp1")).value(), EPS);
+        assertEquals(6.0, metrics.metrics().get(metrics.metricName("test1.total", "grp1")).value(), EPS);
         sensor.record(-6.0);
         try {
             sensor.record(-1.0);
@@ -358,15 +360,15 @@ public void testPercentiles() {
                                             0.0,
                                             100.0,
                                             BucketSizing.CONSTANT,
-                                            new Percentile(new MetricName("test.p25", "grp1"), 25),
-                                            new Percentile(new MetricName("test.p50", "grp1"), 50),
-                                            new Percentile(new MetricName("test.p75", "grp1"), 75));
+                                            new Percentile(metrics.metricName("test.p25", "grp1"), 25),
+                                            new Percentile(metrics.metricName("test.p50", "grp1"), 50),
+                                            new Percentile(metrics.metricName("test.p75", "grp1"), 75));
         MetricConfig config = new MetricConfig().eventWindow(50).samples(2);
         Sensor sensor = metrics.sensor("test", config);
         sensor.add(percs);
-        Metric p25 = this.metrics.metrics().get(new MetricName("test.p25", "grp1"));
-        Metric p50 = this.metrics.metrics().get(new MetricName("test.p50", "grp1"));
-        Metric p75 = this.metrics.metrics().get(new MetricName("test.p75", "grp1"));
+        Metric p25 = this.metrics.metrics().get(metrics.metricName("test.p25", "grp1"));
+        Metric p50 = this.metrics.metrics().get(metrics.metricName("test.p50", "grp1"));
+        Metric p75 = this.metrics.metrics().get(metrics.metricName("test.p75", "grp1"));
 
         // record two windows worth of sequential values
         for (int i = 0; i < buckets; i++)
@@ -389,7 +391,7 @@ public void testRateWindowing() throws Exception {
         // Use the default time window. Set 3 samples
         MetricConfig cfg = new MetricConfig().samples(3);
         Sensor s = metrics.sensor("test.sensor", cfg);
-        s.add(new MetricName("test.rate", "grp1"), new Rate(TimeUnit.SECONDS));
+        s.add(metrics.metricName("test.rate", "grp1"), new Rate(TimeUnit.SECONDS));
 
         int sum = 0;
         int count = cfg.samples() - 1;
@@ -406,7 +408,7 @@ public void testRateWindowing() throws Exception {
         // prior to any time passing
         double elapsedSecs = (cfg.timeWindowMs() * (cfg.samples() - 1) + cfg.timeWindowMs() / 2) / 1000.0;
 
-        KafkaMetric km = metrics.metrics().get(new MetricName("test.rate", "grp1"));
+        KafkaMetric km = metrics.metrics().get(metrics.metricName("test.rate", "grp1"));
         assertEquals("Rate(0...2) = 2.666", sum / elapsedSecs, km.value(), EPS);
         assertEquals("Elapsed Time = 75 seconds", elapsedSecs,
                 ((Rate) km.measurable()).windowSize(cfg, time.milliseconds()) / 1000, EPS);
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
index 18fd080a2ef6a..bce74e1a68ab2 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
@@ -54,7 +54,7 @@ public void setup() throws Exception {
         this.channelBuilder = new PlaintextChannelBuilder();
         this.channelBuilder.configure(configs);
         this.metrics = new Metrics();
-        this.selector = new Selector(5000, this.metrics, time, "MetricGroup", new LinkedHashMap(), channelBuilder);
+        this.selector = new Selector(5000, this.metrics, time, "MetricGroup", channelBuilder);
     }
 
     @After
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
index a442ea00049da..bbc8fe11087b0 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
@@ -18,7 +18,6 @@
 import java.nio.channels.SelectionKey;
 import java.nio.channels.SocketChannel;
 import java.util.ArrayList;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.io.File;
@@ -56,7 +55,7 @@ public void setup() throws Exception {
         this.channelBuilder = new SslChannelBuilder(Mode.CLIENT);
         this.channelBuilder.configure(sslClientConfigs);
         this.metrics = new Metrics();
-        this.selector = new Selector(5000, metrics, time, "MetricGroup", new LinkedHashMap(), channelBuilder);
+        this.selector = new Selector(5000, metrics, time, "MetricGroup", channelBuilder);
     }
 
     @After
@@ -84,7 +83,7 @@ protected SslTransportLayer buildTransportLayer(SslFactory sslFactory, String id
             }
         };
         channelBuilder.configure(sslClientConfigs);
-        Selector selector = new Selector(5000, metrics, time, "MetricGroup2", new LinkedHashMap(), channelBuilder);
+        Selector selector = new Selector(5000, metrics, time, "MetricGroup2", channelBuilder);
         try {
             int reqs = 500;
             String node = "0";
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
index 2b5d26b50bd7d..34ea136ab85ef 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
@@ -21,7 +21,6 @@
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -75,7 +74,7 @@ public void setup() throws Exception {
 
         this.channelBuilder = new SslChannelBuilder(Mode.CLIENT);
         this.channelBuilder.configure(sslClientConfigs);
-        this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", new LinkedHashMap(), channelBuilder);
+        this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", channelBuilder);
     }
 
     @After
@@ -452,7 +451,7 @@ protected SslTransportLayer buildTransportLayer(SslFactory sslFactory, String id
 
         };
         this.channelBuilder.configure(sslClientConfigs);
-        this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", new LinkedHashMap(), channelBuilder);
+        this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", channelBuilder);
     }
     
     private static class CertStores {
@@ -560,7 +559,7 @@ public SslEchoServer(Map configs, String serverHost) throws Exception
             this.newChannels = Collections.synchronizedList(new ArrayList());
             SslChannelBuilder channelBuilder = new SslChannelBuilder(Mode.SERVER);
             channelBuilder.configure(sslServerConfigs);
-            this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", new LinkedHashMap(), channelBuilder);
+            this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", channelBuilder);
             setName("echoserver");
             setDaemon(true);
             acceptorThread = new AcceptorThread();
diff --git a/clients/src/test/java/org/apache/kafka/test/MetricsBench.java b/clients/src/test/java/org/apache/kafka/test/MetricsBench.java
index 5222cd08a0c08..64a0921d30370 100644
--- a/clients/src/test/java/org/apache/kafka/test/MetricsBench.java
+++ b/clients/src/test/java/org/apache/kafka/test/MetricsBench.java
@@ -14,7 +14,6 @@
 
 import java.util.Arrays;
 
-import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
@@ -33,15 +32,15 @@ public static void main(String[] args) {
             Sensor parent = metrics.sensor("parent");
             Sensor child = metrics.sensor("child", parent);
             for (Sensor sensor : Arrays.asList(parent, child)) {
-                sensor.add(new MetricName(sensor.name() + ".avg", "grp1"), new Avg());
-                sensor.add(new MetricName(sensor.name() + ".count", "grp1"), new Count());
-                sensor.add(new MetricName(sensor.name() + ".max", "grp1"), new Max());
+                sensor.add(metrics.metricName(sensor.name() + ".avg", "grp1"), new Avg());
+                sensor.add(metrics.metricName(sensor.name() + ".count", "grp1"), new Count());
+                sensor.add(metrics.metricName(sensor.name() + ".max", "grp1"), new Max());
                 sensor.add(new Percentiles(1024,
                         0.0,
                         iters,
                         BucketSizing.CONSTANT,
-                        new Percentile(new MetricName(sensor.name() + ".median", "grp1"), 50.0),
-                        new Percentile(new MetricName(sensor.name() +  ".p_99", "grp1"), 99.0)));
+                        new Percentile(metrics.metricName(sensor.name() + ".median", "grp1"), 50.0),
+                        new Percentile(metrics.metricName(sensor.name() +  ".p_99", "grp1"), 99.0)));
             }
             long start = System.nanoTime();
             for (int i = 0; i < iters; i++)
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
index 6275636a28854..79199a65d0432 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
@@ -18,7 +18,6 @@
 
 import org.apache.kafka.clients.consumer.internals.AbstractCoordinator;
 import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
-import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.metrics.Measurable;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
@@ -67,7 +66,6 @@ public WorkerCoordinator(ConsumerNetworkClient client,
                              int heartbeatIntervalMs,
                              Metrics metrics,
                              String metricGrpPrefix,
-                             Map metricTags,
                              Time time,
                              long retryBackoffMs,
                              String restUrl,
@@ -79,13 +77,12 @@ public WorkerCoordinator(ConsumerNetworkClient client,
                 heartbeatIntervalMs,
                 metrics,
                 metricGrpPrefix,
-                metricTags,
                 time,
                 retryBackoffMs);
         this.restUrl = restUrl;
         this.configStorage = configStorage;
         this.assignmentSnapshot = null;
-        this.sensors = new WorkerCoordinatorMetrics(metrics, metricGrpPrefix, metricTags);
+        this.sensors = new WorkerCoordinatorMetrics(metrics, metricGrpPrefix);
         this.listener = listener;
         this.rejoinRequested = false;
     }
@@ -254,7 +251,7 @@ private class WorkerCoordinatorMetrics {
         public final Metrics metrics;
         public final String metricGrpName;
 
-        public WorkerCoordinatorMetrics(Metrics metrics, String metricGrpPrefix, Map tags) {
+        public WorkerCoordinatorMetrics(Metrics metrics, String metricGrpPrefix) {
             this.metrics = metrics;
             this.metricGrpName = metricGrpPrefix + "-coordinator-metrics";
 
@@ -270,16 +267,12 @@ public double measure(MetricConfig config, long now) {
                 }
             };
 
-            metrics.addMetric(new MetricName("assigned-connectors",
-                            this.metricGrpName,
-                            "The number of connector instances currently assigned to this consumer",
-                            tags),
-                    numConnectors);
-            metrics.addMetric(new MetricName("assigned-tasks",
-                            this.metricGrpName,
-                            "The number of tasks currently assigned to this consumer",
-                            tags),
-                    numTasks);
+            metrics.addMetric(metrics.metricName("assigned-connectors",
+                              this.metricGrpName,
+                              "The number of connector instances currently assigned to this consumer"), numConnectors);
+            metrics.addMetric(metrics.metricName("assigned-tasks",
+                              this.metricGrpName,
+                              "The number of tasks currently assigned to this consumer"), numTasks);
         }
     }
 
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
index a36608a1ac031..4b243124146d9 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
@@ -71,10 +71,13 @@ public WorkerGroupMember(DistributedConfig config, String restUrl, KafkaConfigSt
         try {
             this.time = new SystemTime();
 
-            MetricConfig metricConfig = new MetricConfig().samples(config.getInt(CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG))
-                    .timeWindow(config.getLong(CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS);
             String clientIdConfig = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG);
             clientId = clientIdConfig.length() <= 0 ? "connect-" + CONNECT_CLIENT_ID_SEQUENCE.getAndIncrement() : clientIdConfig;
+            Map metricsTags = new LinkedHashMap<>();
+            metricsTags.put("client-id", clientId);
+            MetricConfig metricConfig = new MetricConfig().samples(config.getInt(CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG))
+                    .timeWindow(config.getLong(CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
+                    .tags(metricsTags);
             List reporters = config.getConfiguredInstances(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class);
             reporters.add(new JmxReporter(JMX_PREFIX));
             this.metrics = new Metrics(metricConfig, reporters, time);
@@ -83,11 +86,9 @@ public WorkerGroupMember(DistributedConfig config, String restUrl, KafkaConfigSt
             List addresses = ClientUtils.parseAndValidateAddresses(config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG));
             this.metadata.update(Cluster.bootstrap(addresses), 0);
             String metricGrpPrefix = "connect";
-            Map metricsTags = new LinkedHashMap<>();
-            metricsTags.put("client-id", clientId);
             ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values());
             NetworkClient netClient = new NetworkClient(
-                    new Selector(config.getLong(CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, metricsTags, channelBuilder),
+                    new Selector(config.getLong(CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, channelBuilder),
                     this.metadata,
                     clientId,
                     100, // a fixed large enough value will suffice
@@ -102,7 +103,6 @@ public WorkerGroupMember(DistributedConfig config, String restUrl, KafkaConfigSt
                     config.getInt(DistributedConfig.HEARTBEAT_INTERVAL_MS_CONFIG),
                     metrics,
                     metricGrpPrefix,
-                    metricsTags,
                     this.time,
                     retryBackoffMs,
                     restUrl,
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
index f47a9f9267c3a..3eab095fa0c01 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
@@ -47,7 +47,6 @@
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -75,7 +74,6 @@ public class WorkerCoordinatorTest {
     private Node node = cluster.nodes().get(0);
     private Metadata metadata;
     private Metrics metrics;
-    private Map metricTags = new LinkedHashMap<>();
     private ConsumerNetworkClient consumerClient;
     private MockRebalanceListener rebalanceListener;
     @Mock private KafkaConfigStorage configStorage;
@@ -103,7 +101,6 @@ public void setup() {
                 heartbeatIntervalMs,
                 metrics,
                 "consumer" + groupId,
-                metricTags,
                 time,
                 retryBackoffMs,
                 LEADER_URL,
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index 3a7e6de37bbac..a8d9964c8dede 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -226,7 +226,6 @@ object AdminClient {
       metrics,
       time,
       "admin",
-      Map[String, String](),
       channelBuilder)
 
     val networkClient = new NetworkClient(
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index 2c5432eff1e3f..6fa410a4fd55d 100755
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -111,6 +111,15 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
   private val zkCommitMeter = newMeter("ZooKeeperCommitsPerSec", "commits", TimeUnit.SECONDS, Map("clientId" -> config.clientId))
   private val rebalanceTimer = new KafkaTimer(newTimer("RebalanceRateAndTime", TimeUnit.MILLISECONDS, TimeUnit.SECONDS, Map("clientId" -> config.clientId)))
 
+  newGauge(
+    "yammer-metrics-count",
+    new Gauge[Int] {
+      def value = {
+        com.yammer.metrics.Metrics.defaultRegistry().allMetrics().size()
+      }
+    }
+  )
+
   val consumerIdString = {
     var consumerUuid : String = null
     config.consumerId match {
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index 69a9569faa788..c3ecd750c217b 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -70,7 +70,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
   private val allMetricNames = (0 until totalProcessorThreads).map { i =>
     val tags = new util.HashMap[String, String]()
     tags.put("networkProcessor", i.toString)
-    new MetricName("io-wait-ratio", "socket-server-metrics", tags)
+    metrics.metricName("io-wait-ratio", "socket-server-metrics", tags)
   }
 
   /**
@@ -384,7 +384,7 @@ private[kafka] class Processor(val id: Int,
   newGauge("IdlePercent",
     new Gauge[Double] {
       def value = {
-        metrics.metrics().get(new MetricName("io-wait-ratio", "socket-server-metrics", metricTags)).value()
+        metrics.metrics().get(metrics.metricName("io-wait-ratio", "socket-server-metrics", metricTags)).value()
       }
     },
     metricTags.asScala
diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
index 82fec73f80a3c..37b432c9d5b1c 100644
--- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
@@ -76,7 +76,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
   throttledRequestReaper.start()
 
   private val delayQueueSensor = metrics.sensor(apiKey + "-delayQueue")
-  delayQueueSensor.add(new MetricName("queue-size",
+  delayQueueSensor.add(metrics.metricName("queue-size",
                                       apiKey,
                                       "Tracks the size of the delay queue"), new Total())
 
@@ -206,7 +206,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
           throttleTimeSensor = metrics.sensor(throttleTimeSensorName,
                                               null,
                                               ClientQuotaManagerConfig.InactiveSensorExpirationTimeSeconds)
-          throttleTimeSensor.add(new MetricName("throttle-time",
+          throttleTimeSensor.add(metrics.metricName("throttle-time",
                                                 apiKey,
                                                 "Tracking average throttle-time per client",
                                                 "client-id",
@@ -271,7 +271,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
   }
 
   private def clientRateMetricName(clientId: String): MetricName = {
-    new MetricName("byte-rate", apiKey,
+    metrics.metricName("byte-rate", apiKey,
                    "Tracking byte-rate per client",
                    "client-id", clientId)
   }
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 9eedbe2463da8..812016738f0d6 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -141,6 +141,15 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
     }
   )
 
+  newGauge(
+    "yammer-metrics-count",
+    new Gauge[Int] {
+      def value = {
+        com.yammer.metrics.Metrics.defaultRegistry().allMetrics().size()
+      }
+    }
+  )
+
   /**
    * Start up API for bringing up a single instance of the Kafka server.
    * Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers
diff --git a/core/src/main/scala/kafka/tools/JmxTool.scala b/core/src/main/scala/kafka/tools/JmxTool.scala
index d335b3e65703b..bd7ca0e85365e 100644
--- a/core/src/main/scala/kafka/tools/JmxTool.scala
+++ b/core/src/main/scala/kafka/tools/JmxTool.scala
@@ -90,9 +90,6 @@ object JmxTool extends Logging {
         List(null)
 
     val names = queries.map((name: ObjectName) => mbsc.queryNames(name, null): mutable.Set[ObjectName]).flatten
-    val allAttributes: Iterable[(ObjectName, Array[String])] =
-      names.map((name: ObjectName) => (name, mbsc.getMBeanInfo(name).getAttributes().map(_.getName)))
-
 
     val numExpectedAttributes: Map[ObjectName, Int] =
       attributesWhitelistExists match {
diff --git a/core/src/test/scala/integration/kafka/api/QuotasTest.scala b/core/src/test/scala/integration/kafka/api/QuotasTest.scala
index cc1f821acd420..23be1208af10b 100644
--- a/core/src/test/scala/integration/kafka/api/QuotasTest.scala
+++ b/core/src/test/scala/integration/kafka/api/QuotasTest.scala
@@ -125,10 +125,10 @@ class QuotasTest extends KafkaServerTestHarness {
     val numRecords = 1000
     produce(producers.head, numRecords)
 
-    val producerMetricName = new MetricName("throttle-time",
-                                    ApiKeys.PRODUCE.name,
-                                    "Tracking throttle-time per client",
-                                    "client-id", producerId1)
+    val producerMetricName = leaderNode.metrics.metricName("throttle-time",
+                                                           ApiKeys.PRODUCE.name,
+                                                           "Tracking throttle-time per client",
+                                                           "client-id", producerId1)
     assertTrue("Should have been throttled", allMetrics(producerMetricName).value() > 0)
 
     // Consumer should read in a bursty manner and get throttled immediately
@@ -136,10 +136,10 @@ class QuotasTest extends KafkaServerTestHarness {
     // The replica consumer should not be throttled also. Create a fetch request which will exceed the quota immediately
     val request = new FetchRequestBuilder().addFetch(topic1, 0, 0, 1024*1024).replicaId(followerNode.config.brokerId).build()
     replicaConsumers.head.fetch(request)
-    val consumerMetricName = new MetricName("throttle-time",
-                                            ApiKeys.FETCH.name,
-                                            "Tracking throttle-time per client",
-                                            "client-id", consumerId1)
+    val consumerMetricName = leaderNode.metrics.metricName("throttle-time",
+                                                           ApiKeys.FETCH.name,
+                                                           "Tracking throttle-time per client",
+                                                           "client-id", consumerId1)
     assertTrue("Should have been throttled", allMetrics(consumerMetricName).value() > 0)
   }
 
@@ -166,10 +166,10 @@ class QuotasTest extends KafkaServerTestHarness {
     val allMetrics: mutable.Map[MetricName, KafkaMetric] = leaderNode.metrics.metrics().asScala
     val numRecords = 1000
     produce(producers(1), numRecords)
-    val producerMetricName = new MetricName("throttle-time",
-                                            ApiKeys.PRODUCE.name,
-                                            "Tracking throttle-time per client",
-                                            "client-id", producerId2)
+    val producerMetricName = leaderNode.metrics.metricName("throttle-time",
+                                                           ApiKeys.PRODUCE.name,
+                                                           "Tracking throttle-time per client",
+                                                           "client-id", producerId2)
     assertEquals("Should not have been throttled", 0.0, allMetrics(producerMetricName).value(), 0.0)
 
     // The "client" consumer does not get throttled.
@@ -177,10 +177,10 @@ class QuotasTest extends KafkaServerTestHarness {
     // The replica consumer should not be throttled also. Create a fetch request which will exceed the quota immediately
     val request = new FetchRequestBuilder().addFetch(topic1, 0, 0, 1024*1024).replicaId(followerNode.config.brokerId).build()
     replicaConsumers(1).fetch(request)
-    val consumerMetricName = new MetricName("throttle-time",
-                                            ApiKeys.FETCH.name,
-                                            "Tracking throttle-time per client",
-                                            "client-id", consumerId2)
+    val consumerMetricName = leaderNode.metrics.metricName("throttle-time",
+                                                           ApiKeys.FETCH.name,
+                                                           "Tracking throttle-time per client",
+                                                           "client-id", consumerId2)
     assertEquals("Should not have been throttled", 0.0, allMetrics(consumerMetricName).value(), 0.0)
   }
 
diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
index fadcd5ae3f9fb..68d693215fe3e 100644
--- a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
@@ -79,7 +79,7 @@ class ClientQuotaManagerTest {
   def testQuotaViolation() {
     val metrics = newMetrics
     val clientMetrics = new ClientQuotaManager(config, metrics, "producer", time)
-    val queueSizeMetric = metrics.metrics().get(new MetricName("queue-size", "producer", ""))
+    val queueSizeMetric = metrics.metrics().get(metrics.metricName("queue-size", "producer", ""))
     try {
       /* We have 10 second windows. Make sure that there is no quota violation
        * if we produce under the quota
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 4d1ef43946510..38333a23bae38 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -680,30 +680,30 @@ public StreamingMetricsImpl(Metrics metrics) {
             this.metricTags.put("client-id", clientId + "-" + getName());
 
             this.commitTimeSensor = metrics.sensor("commit-time");
-            this.commitTimeSensor.add(new MetricName("commit-time-avg", metricGrpName, "The average commit time in ms", metricTags), new Avg());
-            this.commitTimeSensor.add(new MetricName("commit-time-max", metricGrpName, "The maximum commit time in ms", metricTags), new Max());
-            this.commitTimeSensor.add(new MetricName("commit-calls-rate", metricGrpName, "The average per-second number of commit calls", metricTags), new Rate(new Count()));
+            this.commitTimeSensor.add(metrics.metricName("commit-time-avg", metricGrpName, "The average commit time in ms", metricTags), new Avg());
+            this.commitTimeSensor.add(metrics.metricName("commit-time-max", metricGrpName, "The maximum commit time in ms", metricTags), new Max());
+            this.commitTimeSensor.add(metrics.metricName("commit-calls-rate", metricGrpName, "The average per-second number of commit calls", metricTags), new Rate(new Count()));
 
             this.pollTimeSensor = metrics.sensor("poll-time");
-            this.pollTimeSensor.add(new MetricName("poll-time-avg", metricGrpName, "The average poll time in ms", metricTags), new Avg());
-            this.pollTimeSensor.add(new MetricName("poll-time-max", metricGrpName, "The maximum poll time in ms", metricTags), new Max());
-            this.pollTimeSensor.add(new MetricName("poll-calls-rate", metricGrpName, "The average per-second number of record-poll calls", metricTags), new Rate(new Count()));
+            this.pollTimeSensor.add(metrics.metricName("poll-time-avg", metricGrpName, "The average poll time in ms", metricTags), new Avg());
+            this.pollTimeSensor.add(metrics.metricName("poll-time-max", metricGrpName, "The maximum poll time in ms", metricTags), new Max());
+            this.pollTimeSensor.add(metrics.metricName("poll-calls-rate", metricGrpName, "The average per-second number of record-poll calls", metricTags), new Rate(new Count()));
 
             this.processTimeSensor = metrics.sensor("process-time");
-            this.processTimeSensor.add(new MetricName("process-time-avg-ms", metricGrpName, "The average process time in ms", metricTags), new Avg());
-            this.processTimeSensor.add(new MetricName("process-time-max-ms", metricGrpName, "The maximum process time in ms", metricTags), new Max());
-            this.processTimeSensor.add(new MetricName("process-calls-rate", metricGrpName, "The average per-second number of process calls", metricTags), new Rate(new Count()));
+            this.processTimeSensor.add(metrics.metricName("process-time-avg-ms", metricGrpName, "The average process time in ms", metricTags), new Avg());
+            this.processTimeSensor.add(metrics.metricName("process-time-max-ms", metricGrpName, "The maximum process time in ms", metricTags), new Max());
+            this.processTimeSensor.add(metrics.metricName("process-calls-rate", metricGrpName, "The average per-second number of process calls", metricTags), new Rate(new Count()));
 
             this.punctuateTimeSensor = metrics.sensor("punctuate-time");
-            this.punctuateTimeSensor.add(new MetricName("punctuate-time-avg", metricGrpName, "The average punctuate time in ms", metricTags), new Avg());
-            this.punctuateTimeSensor.add(new MetricName("punctuate-time-max", metricGrpName, "The maximum punctuate time in ms", metricTags), new Max());
-            this.punctuateTimeSensor.add(new MetricName("punctuate-calls-rate", metricGrpName, "The average per-second number of punctuate calls", metricTags), new Rate(new Count()));
+            this.punctuateTimeSensor.add(metrics.metricName("punctuate-time-avg", metricGrpName, "The average punctuate time in ms", metricTags), new Avg());
+            this.punctuateTimeSensor.add(metrics.metricName("punctuate-time-max", metricGrpName, "The maximum punctuate time in ms", metricTags), new Max());
+            this.punctuateTimeSensor.add(metrics.metricName("punctuate-calls-rate", metricGrpName, "The average per-second number of punctuate calls", metricTags), new Rate(new Count()));
 
             this.taskCreationSensor = metrics.sensor("task-creation");
-            this.taskCreationSensor.add(new MetricName("task-creation-rate", metricGrpName, "The average per-second number of newly created tasks", metricTags), new Rate(new Count()));
+            this.taskCreationSensor.add(metrics.metricName("task-creation-rate", metricGrpName, "The average per-second number of newly created tasks", metricTags), new Rate(new Count()));
 
             this.taskDestructionSensor = metrics.sensor("task-destruction");
-            this.taskDestructionSensor.add(new MetricName("task-destruction-rate", metricGrpName, "The average per-second number of destructed tasks", metricTags), new Rate(new Count()));
+            this.taskDestructionSensor.add(metrics.metricName("task-destruction-rate", metricGrpName, "The average per-second number of destructed tasks", metricTags), new Rate(new Count()));
         }
 
         @Override
@@ -733,11 +733,11 @@ public Sensor addLatencySensor(String scopeName, String entityName, String opera
         }
 
         private void addLatencyMetrics(String metricGrpName, Sensor sensor, String entityName, String opName, Map tags) {
-            maybeAddMetric(sensor, new MetricName(opName + "-avg-latency-ms", metricGrpName,
+            maybeAddMetric(sensor, metrics.metricName(opName + "-avg-latency-ms", metricGrpName,
                 "The average latency in milliseconds of " + entityName + " " + opName + " operation.", tags), new Avg());
-            maybeAddMetric(sensor, new MetricName(opName + "-max-latency-ms", metricGrpName,
+            maybeAddMetric(sensor, metrics.metricName(opName + "-max-latency-ms", metricGrpName,
                 "The max latency in milliseconds of " + entityName + " " + opName + " operation.", tags), new Max());
-            maybeAddMetric(sensor, new MetricName(opName + "-qps", metricGrpName,
+            maybeAddMetric(sensor, metrics.metricName(opName + "-qps", metricGrpName,
                 "The average number of occurrence of " + entityName + " " + opName + " operation per second.", tags), new Rate(new Count()));
         }