Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -531,12 +531,14 @@ private KafkaConsumer(ConsumerConfig config,
throw new ConfigException(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG + " should be greater than " + ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG + " and " + ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);
this.time = new SystemTime();

MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG))
.timeWindow(config.getLong(ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
TimeUnit.MILLISECONDS);
clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
if (clientId.length() <= 0)
clientId = "consumer-" + CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement();
Map<String, String> metricsTags = new LinkedHashMap<String, String>();
metricsTags.put("client-id", clientId);
MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG))
.timeWindow(config.getLong(ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
.tags(metricsTags);
List<MetricsReporter> reporters = config.getConfiguredInstances(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG,
MetricsReporter.class);
reporters.add(new JmxReporter(JMX_PREFIX));
Expand All @@ -546,11 +548,9 @@ private KafkaConsumer(ConsumerConfig config,
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
this.metadata.update(Cluster.bootstrap(addresses), 0);
String metricGrpPrefix = "consumer";
Map<String, String> metricsTags = new LinkedHashMap<String, String>();
metricsTags.put("client-id", clientId);
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values());
NetworkClient netClient = new NetworkClient(
new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, metricsTags, channelBuilder),
new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, channelBuilder),
this.metadata,
clientId,
100, // a fixed large enough value will suffice
Expand All @@ -573,7 +573,6 @@ private KafkaConsumer(ConsumerConfig config,
this.subscriptions,
metrics,
metricGrpPrefix,
metricsTags,
this.time,
retryBackoffMs,
new ConsumerCoordinator.DefaultOffsetCommitCallback(),
Expand Down Expand Up @@ -606,7 +605,6 @@ private KafkaConsumer(ConsumerConfig config,
this.subscriptions,
metrics,
metricGrpPrefix,
metricsTags,
this.time,
this.retryBackoffMs);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
Expand Down Expand Up @@ -107,7 +106,6 @@ public AbstractCoordinator(ConsumerNetworkClient client,
int heartbeatIntervalMs,
Metrics metrics,
String metricGrpPrefix,
Map<String, String> metricTags,
Time time,
long retryBackoffMs) {
this.client = client;
Expand All @@ -119,7 +117,7 @@ public AbstractCoordinator(ConsumerNetworkClient client,
this.sessionTimeoutMs = sessionTimeoutMs;
this.heartbeat = new Heartbeat(this.sessionTimeoutMs, heartbeatIntervalMs, time.milliseconds());
this.heartbeatTask = new HeartbeatTask();
this.sensors = new GroupCoordinatorMetrics(metrics, metricGrpPrefix, metricTags);
this.sensors = new GroupCoordinatorMetrics(metrics, metricGrpPrefix);
this.retryBackoffMs = retryBackoffMs;
}

Expand Down Expand Up @@ -679,58 +677,49 @@ private class GroupCoordinatorMetrics {
public final Sensor joinLatency;
public final Sensor syncLatency;

public GroupCoordinatorMetrics(Metrics metrics, String metricGrpPrefix, Map<String, String> tags) {
public GroupCoordinatorMetrics(Metrics metrics, String metricGrpPrefix) {
this.metrics = metrics;
this.metricGrpName = metricGrpPrefix + "-coordinator-metrics";

this.heartbeatLatency = metrics.sensor("heartbeat-latency");
this.heartbeatLatency.add(new MetricName("heartbeat-response-time-max",
this.heartbeatLatency.add(metrics.metricName("heartbeat-response-time-max",
this.metricGrpName,
"The max time taken to receive a response to a heartbeat request",
tags), new Max());
this.heartbeatLatency.add(new MetricName("heartbeat-rate",
"The max time taken to receive a response to a heartbeat request"), new Max());
this.heartbeatLatency.add(metrics.metricName("heartbeat-rate",
this.metricGrpName,
"The average number of heartbeats per second",
tags), new Rate(new Count()));
"The average number of heartbeats per second"), new Rate(new Count()));

this.joinLatency = metrics.sensor("join-latency");
this.joinLatency.add(new MetricName("join-time-avg",
this.joinLatency.add(metrics.metricName("join-time-avg",
this.metricGrpName,
"The average time taken for a group rejoin",
tags), new Avg());
this.joinLatency.add(new MetricName("join-time-max",
"The average time taken for a group rejoin"), new Avg());
this.joinLatency.add(metrics.metricName("join-time-max",
this.metricGrpName,
"The max time taken for a group rejoin",
tags), new Avg());
this.joinLatency.add(new MetricName("join-rate",
"The max time taken for a group rejoin"), new Avg());
this.joinLatency.add(metrics.metricName("join-rate",
this.metricGrpName,
"The number of group joins per second",
tags), new Rate(new Count()));
"The number of group joins per second"), new Rate(new Count()));

this.syncLatency = metrics.sensor("sync-latency");
this.syncLatency.add(new MetricName("sync-time-avg",
this.syncLatency.add(metrics.metricName("sync-time-avg",
this.metricGrpName,
"The average time taken for a group sync",
tags), new Avg());
this.syncLatency.add(new MetricName("sync-time-max",
"The average time taken for a group sync"), new Avg());
this.syncLatency.add(metrics.metricName("sync-time-max",
this.metricGrpName,
"The max time taken for a group sync",
tags), new Avg());
this.syncLatency.add(new MetricName("sync-rate",
"The max time taken for a group sync"), new Avg());
this.syncLatency.add(metrics.metricName("sync-rate",
this.metricGrpName,
"The number of group syncs per second",
tags), new Rate(new Count()));
"The number of group syncs per second"), new Rate(new Count()));

Measurable lastHeartbeat =
new Measurable() {
public double measure(MetricConfig config, long now) {
return TimeUnit.SECONDS.convert(now - heartbeat.lastHeartbeatSend(), TimeUnit.MILLISECONDS);
}
};
metrics.addMetric(new MetricName("last-heartbeat-seconds-ago",
metrics.addMetric(metrics.metricName("last-heartbeat-seconds-ago",
this.metricGrpName,
"The number of seconds since the last controller heartbeat",
tags),
"The number of seconds since the last controller heartbeat"),
lastHeartbeat);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
Expand Down Expand Up @@ -82,7 +81,6 @@ public ConsumerCoordinator(ConsumerNetworkClient client,
SubscriptionState subscriptions,
Metrics metrics,
String metricGrpPrefix,
Map<String, String> metricTags,
Time time,
long retryBackoffMs,
OffsetCommitCallback defaultOffsetCommitCallback,
Expand All @@ -94,7 +92,6 @@ public ConsumerCoordinator(ConsumerNetworkClient client,
heartbeatIntervalMs,
metrics,
metricGrpPrefix,
metricTags,
time,
retryBackoffMs);
this.metadata = metadata;
Expand All @@ -109,7 +106,7 @@ public ConsumerCoordinator(ConsumerNetworkClient client,
addMetadataListener();

this.autoCommitTask = autoCommitEnabled ? new AutoCommitTask(autoCommitIntervalMs) : null;
this.sensors = new ConsumerCoordinatorMetrics(metrics, metricGrpPrefix, metricTags);
this.sensors = new ConsumerCoordinatorMetrics(metrics, metricGrpPrefix);
}

@Override
Expand Down Expand Up @@ -639,35 +636,30 @@ private class ConsumerCoordinatorMetrics {

public final Sensor commitLatency;

public ConsumerCoordinatorMetrics(Metrics metrics, String metricGrpPrefix, Map<String, String> tags) {
public ConsumerCoordinatorMetrics(Metrics metrics, String metricGrpPrefix) {
this.metrics = metrics;
this.metricGrpName = metricGrpPrefix + "-coordinator-metrics";

this.commitLatency = metrics.sensor("commit-latency");
this.commitLatency.add(new MetricName("commit-latency-avg",
this.commitLatency.add(metrics.metricName("commit-latency-avg",
this.metricGrpName,
"The average time taken for a commit request",
tags), new Avg());
this.commitLatency.add(new MetricName("commit-latency-max",
"The average time taken for a commit request"), new Avg());
this.commitLatency.add(metrics.metricName("commit-latency-max",
this.metricGrpName,
"The max time taken for a commit request",
tags), new Max());
this.commitLatency.add(new MetricName("commit-rate",
"The max time taken for a commit request"), new Max());
this.commitLatency.add(metrics.metricName("commit-rate",
this.metricGrpName,
"The number of commit calls per second",
tags), new Rate(new Count()));
"The number of commit calls per second"), new Rate(new Count()));

Measurable numParts =
new Measurable() {
public double measure(MetricConfig config, long now) {
return subscriptions.assignedPartitions().size();
}
};
metrics.addMetric(new MetricName("assigned-partitions",
metrics.addMetric(metrics.metricName("assigned-partitions",
this.metricGrpName,
"The number of partitions currently assigned to this consumer",
tags),
numParts);
"The number of partitions currently assigned to this consumer"), numParts);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
Expand Down Expand Up @@ -99,7 +98,6 @@ public Fetcher(ConsumerNetworkClient client,
SubscriptionState subscriptions,
Metrics metrics,
String metricGrpPrefix,
Map<String, String> metricTags,
Time time,
long retryBackoffMs) {

Expand All @@ -120,7 +118,7 @@ public Fetcher(ConsumerNetworkClient client,
this.unauthorizedTopics = new HashSet<>();
this.recordTooLargePartitions = new HashMap<>();

this.sensors = new FetchManagerMetrics(metrics, metricGrpPrefix, metricTags);
this.sensors = new FetchManagerMetrics(metrics, metricGrpPrefix);
this.retryBackoffMs = retryBackoffMs;
}

Expand Down Expand Up @@ -656,64 +654,53 @@ private class FetchManagerMetrics {
public final Sensor fetchThrottleTimeSensor;


public FetchManagerMetrics(Metrics metrics, String metricGrpPrefix, Map<String, String> tags) {
public FetchManagerMetrics(Metrics metrics, String metricGrpPrefix) {
this.metrics = metrics;
this.metricGrpName = metricGrpPrefix + "-fetch-manager-metrics";

this.bytesFetched = metrics.sensor("bytes-fetched");
this.bytesFetched.add(new MetricName("fetch-size-avg",
this.bytesFetched.add(metrics.metricName("fetch-size-avg",
this.metricGrpName,
"The average number of bytes fetched per request",
tags), new Avg());
this.bytesFetched.add(new MetricName("fetch-size-max",
"The average number of bytes fetched per request"), new Avg());
this.bytesFetched.add(metrics.metricName("fetch-size-max",
this.metricGrpName,
"The maximum number of bytes fetched per request",
tags), new Max());
this.bytesFetched.add(new MetricName("bytes-consumed-rate",
"The maximum number of bytes fetched per request"), new Max());
this.bytesFetched.add(metrics.metricName("bytes-consumed-rate",
this.metricGrpName,
"The average number of bytes consumed per second",
tags), new Rate());
"The average number of bytes consumed per second"), new Rate());

this.recordsFetched = metrics.sensor("records-fetched");
this.recordsFetched.add(new MetricName("records-per-request-avg",
this.recordsFetched.add(metrics.metricName("records-per-request-avg",
this.metricGrpName,
"The average number of records in each request",
tags), new Avg());
this.recordsFetched.add(new MetricName("records-consumed-rate",
"The average number of records in each request"), new Avg());
this.recordsFetched.add(metrics.metricName("records-consumed-rate",
this.metricGrpName,
"The average number of records consumed per second",
tags), new Rate());
"The average number of records consumed per second"), new Rate());

this.fetchLatency = metrics.sensor("fetch-latency");
this.fetchLatency.add(new MetricName("fetch-latency-avg",
this.fetchLatency.add(metrics.metricName("fetch-latency-avg",
this.metricGrpName,
"The average time taken for a fetch request.",
tags), new Avg());
this.fetchLatency.add(new MetricName("fetch-latency-max",
"The average time taken for a fetch request."), new Avg());
this.fetchLatency.add(metrics.metricName("fetch-latency-max",
this.metricGrpName,
"The max time taken for any fetch request.",
tags), new Max());
this.fetchLatency.add(new MetricName("fetch-rate",
"The max time taken for any fetch request."), new Max());
this.fetchLatency.add(metrics.metricName("fetch-rate",
this.metricGrpName,
"The number of fetch requests per second.",
tags), new Rate(new Count()));
"The number of fetch requests per second."), new Rate(new Count()));

this.recordsFetchLag = metrics.sensor("records-lag");
this.recordsFetchLag.add(new MetricName("records-lag-max",
this.recordsFetchLag.add(metrics.metricName("records-lag-max",
this.metricGrpName,
"The maximum lag in terms of number of records for any partition in this window",
tags), new Max());
"The maximum lag in terms of number of records for any partition in this window"), new Max());

this.fetchThrottleTimeSensor = metrics.sensor("fetch-throttle-time");
this.fetchThrottleTimeSensor.add(new MetricName("fetch-throttle-time-avg",
this.fetchThrottleTimeSensor.add(metrics.metricName("fetch-throttle-time-avg",
this.metricGrpName,
"The average throttle time in ms",
tags), new Avg());
"The average throttle time in ms"), new Avg());

this.fetchThrottleTimeSensor.add(new MetricName("fetch-throttle-time-max",
this.fetchThrottleTimeSensor.add(metrics.metricName("fetch-throttle-time-max",
this.metricGrpName,
"The maximum throttle time in ms",
tags), new Max());
"The maximum throttle time in ms"), new Max());
}

public void recordTopicFetchMetrics(String topic, int bytes, int records) {
Expand Down
Loading