diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java index 5610dbab218e0..1980af91b7b54 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java @@ -96,7 +96,7 @@ void updateStats(TopicStats stats) { stats.replicationStats.forEach((n, as) -> { AggregatedReplicationStats replStats = - replicationStats.computeIfAbsent(n, k -> new AggregatedReplicationStats()); + replicationStats.computeIfAbsent(n, k -> new AggregatedReplicationStats()); replStats.msgRateIn += as.msgRateIn; replStats.msgRateOut += as.msgRateOut; replStats.msgThroughputIn += as.msgThroughputIn; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java index 16e438e2a2eb3..29915f071c099 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java @@ -19,8 +19,11 @@ package org.apache.pulsar.broker.stats.prometheus; import io.netty.util.concurrent.FastThreadLocal; +import java.util.HashMap; +import java.util.Map; import java.util.Optional; import java.util.concurrent.atomic.LongAdder; +import java.util.function.Function; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.mledger.ManagedLedger; @@ -32,7 +35,6 @@ import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl; import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl; import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl; -import org.apache.pulsar.common.util.SimpleTextOutputStream; import org.apache.pulsar.compaction.CompactedTopicContext; import org.apache.pulsar.compaction.Compactor; import org.apache.pulsar.compaction.CompactorMXBean; @@ -40,72 +42,75 @@ @Slf4j public class NamespaceStatsAggregator { - private static FastThreadLocal localNamespaceStats = + private static final FastThreadLocal localNamespaceStats = new FastThreadLocal() { @Override - protected AggregatedNamespaceStats initialValue() throws Exception { + protected AggregatedNamespaceStats initialValue() { return new AggregatedNamespaceStats(); } }; - private static FastThreadLocal localTopicStats = new FastThreadLocal() { + private static final FastThreadLocal localTopicStats = new FastThreadLocal() { @Override - protected TopicStats initialValue() throws Exception { + protected TopicStats initialValue() { return new TopicStats(); } }; public static void generate(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics, - boolean includeProducerMetrics, boolean splitTopicAndPartitionIndexLabel, SimpleTextOutputStream stream) { + boolean includeProducerMetrics, boolean splitTopicAndPartitionIndexLabel, + PrometheusMetricStreams stream) { String cluster = pulsar.getConfiguration().getClusterName(); AggregatedNamespaceStats namespaceStats = localNamespaceStats.get(); - TopicStats.resetTypes(); TopicStats topicStats = localTopicStats.get(); + Optional compactorMXBean = getCompactorMXBean(pulsar); + LongAdder topicsCount = new LongAdder(); + Map localNamespaceTopicCount = new HashMap<>(); printDefaultBrokerStats(stream, cluster); - Optional compactorMXBean = getCompactorMXBean(pulsar); - LongAdder topicsCount = new LongAdder(); pulsar.getBrokerService().getMultiLayerTopicMap().forEach((namespace, bundlesMap) -> { namespaceStats.reset(); topicsCount.reset(); - bundlesMap.forEach((bundle, topicsMap) -> { - topicsMap.forEach((name, topic) -> { - getTopicStats(topic, topicStats, includeConsumerMetrics, includeProducerMetrics, - pulsar.getConfiguration().isExposePreciseBacklogInPrometheus(), - pulsar.getConfiguration().isExposeSubscriptionBacklogSizeInPrometheus(), - compactorMXBean - ); - - if (includeTopicMetrics) { - topicsCount.add(1); - TopicStats.printTopicStats(stream, cluster, namespace, name, topicStats, compactorMXBean, - splitTopicAndPartitionIndexLabel); - } else { - namespaceStats.updateStats(topicStats); - } - }); - }); + bundlesMap.forEach((bundle, topicsMap) -> topicsMap.forEach((name, topic) -> { + getTopicStats(topic, topicStats, includeConsumerMetrics, includeProducerMetrics, + pulsar.getConfiguration().isExposePreciseBacklogInPrometheus(), + pulsar.getConfiguration().isExposeSubscriptionBacklogSizeInPrometheus(), + compactorMXBean + ); + + if (includeTopicMetrics) { + topicsCount.add(1); + TopicStats.printTopicStats(stream, topicStats, compactorMXBean, cluster, namespace, name, + splitTopicAndPartitionIndexLabel); + } else { + namespaceStats.updateStats(topicStats); + } + })); if (!includeTopicMetrics) { - // Only include namespace level stats if we don't have the per-topic, otherwise we're going to report - // the same data twice, and it will make the aggregation difficult - printNamespaceStats(stream, cluster, namespace, namespaceStats); + // Only include namespace level stats if we don't have the per-topic, otherwise we're going to + // report the same data twice, and it will make the aggregation difficult + printNamespaceStats(stream, namespaceStats, cluster, namespace); } else { - printTopicsCountStats(stream, cluster, namespace, topicsCount); + localNamespaceTopicCount.put(namespace, topicsCount.sum()); } }); + + if (includeTopicMetrics) { + printTopicsCountStats(stream, localNamespaceTopicCount, cluster); + } } private static Optional getCompactorMXBean(PulsarService pulsar) { Compactor compactor = pulsar.getNullableCompactor(); - return Optional.ofNullable(compactor).map(c -> c.getStats()); + return Optional.ofNullable(compactor).map(Compactor::getStats); } private static void getTopicStats(Topic topic, TopicStats stats, boolean includeConsumerMetrics, - boolean includeProducerMetrics, boolean getPreciseBacklog, boolean subscriptionBacklogSize, - Optional compactorMXBean) { + boolean includeProducerMetrics, boolean getPreciseBacklog, + boolean subscriptionBacklogSize, Optional compactorMXBean) { stats.reset(); if (topic instanceof PersistentTopic) { @@ -267,161 +272,176 @@ private static void getTopicStats(Topic topic, TopicStats stats, boolean include }); } - private static void printDefaultBrokerStats(SimpleTextOutputStream stream, String cluster) { + private static void printDefaultBrokerStats(PrometheusMetricStreams stream, String cluster) { // Print metrics with 0 values. This is necessary to have the available brokers being // reported in the brokers dashboard even if they don't have any topic or traffic - metric(stream, cluster, "pulsar_topics_count", 0); - metric(stream, cluster, "pulsar_subscriptions_count", 0); - metric(stream, cluster, "pulsar_producers_count", 0); - metric(stream, cluster, "pulsar_consumers_count", 0); - metric(stream, cluster, "pulsar_rate_in", 0); - metric(stream, cluster, "pulsar_rate_out", 0); - metric(stream, cluster, "pulsar_throughput_in", 0); - metric(stream, cluster, "pulsar_throughput_out", 0); - metric(stream, cluster, "pulsar_storage_size", 0); - metric(stream, cluster, "pulsar_storage_logical_size", 0); - metric(stream, cluster, "pulsar_storage_write_rate", 0); - metric(stream, cluster, "pulsar_storage_read_rate", 0); - metric(stream, cluster, "pulsar_msg_backlog", 0); + writeMetric(stream, "pulsar_topics_count", 0, cluster); + writeMetric(stream, "pulsar_subscriptions_count", 0, cluster); + writeMetric(stream, "pulsar_producers_count", 0, cluster); + writeMetric(stream, "pulsar_consumers_count", 0, cluster); + writeMetric(stream, "pulsar_rate_in", 0, cluster); + writeMetric(stream, "pulsar_rate_out", 0, cluster); + writeMetric(stream, "pulsar_throughput_in", 0, cluster); + writeMetric(stream, "pulsar_throughput_out", 0, cluster); + writeMetric(stream, "pulsar_storage_size", 0, cluster); + writeMetric(stream, "pulsar_storage_logical_size", 0, cluster); + writeMetric(stream, "pulsar_storage_write_rate", 0, cluster); + writeMetric(stream, "pulsar_storage_read_rate", 0, cluster); + writeMetric(stream, "pulsar_msg_backlog", 0, cluster); } - private static void printTopicsCountStats(SimpleTextOutputStream stream, String cluster, String namespace, - LongAdder topicsCount) { - metric(stream, cluster, namespace, "pulsar_topics_count", topicsCount.sum()); + private static void printTopicsCountStats(PrometheusMetricStreams stream, Map namespaceTopicsCount, + String cluster) { + namespaceTopicsCount.forEach( + (ns, topicCount) -> writeMetric(stream, "pulsar_topics_count", topicCount, cluster, ns) + ); } - private static void printNamespaceStats(SimpleTextOutputStream stream, String cluster, String namespace, - AggregatedNamespaceStats stats) { - metric(stream, cluster, namespace, "pulsar_topics_count", stats.topicsCount); - metric(stream, cluster, namespace, "pulsar_subscriptions_count", stats.subscriptionsCount); - metric(stream, cluster, namespace, "pulsar_producers_count", stats.producersCount); - metric(stream, cluster, namespace, "pulsar_consumers_count", stats.consumersCount); - - metric(stream, cluster, namespace, "pulsar_rate_in", stats.rateIn); - metric(stream, cluster, namespace, "pulsar_rate_out", stats.rateOut); - metric(stream, cluster, namespace, "pulsar_throughput_in", stats.throughputIn); - metric(stream, cluster, namespace, "pulsar_throughput_out", stats.throughputOut); - metric(stream, cluster, namespace, "pulsar_consumer_msg_ack_rate", stats.messageAckRate); - - metric(stream, cluster, namespace, "pulsar_in_bytes_total", stats.bytesInCounter); - metric(stream, cluster, namespace, "pulsar_in_messages_total", stats.msgInCounter); - metric(stream, cluster, namespace, "pulsar_out_bytes_total", stats.bytesOutCounter); - metric(stream, cluster, namespace, "pulsar_out_messages_total", stats.msgOutCounter); - - metric(stream, cluster, namespace, "pulsar_storage_size", stats.managedLedgerStats.storageSize); - metric(stream, cluster, namespace, "pulsar_storage_logical_size", stats.managedLedgerStats.storageLogicalSize); - metric(stream, cluster, namespace, "pulsar_storage_backlog_size", stats.managedLedgerStats.backlogSize); - metric(stream, cluster, namespace, "pulsar_storage_offloaded_size", - stats.managedLedgerStats.offloadedStorageUsed); - - metric(stream, cluster, namespace, "pulsar_storage_write_rate", stats.managedLedgerStats.storageWriteRate); - metric(stream, cluster, namespace, "pulsar_storage_read_rate", stats.managedLedgerStats.storageReadRate); - - metric(stream, cluster, namespace, "pulsar_subscription_delayed", stats.msgDelayed); - - metricWithRemoteCluster(stream, cluster, namespace, "pulsar_msg_backlog", "local", stats.msgBacklog); + private static void printNamespaceStats(PrometheusMetricStreams stream, AggregatedNamespaceStats stats, + String cluster, String namespace) { + writeMetric(stream, "pulsar_topics_count", stats.topicsCount, cluster, namespace); + writeMetric(stream, "pulsar_subscriptions_count", stats.subscriptionsCount, cluster, + namespace); + writeMetric(stream, "pulsar_producers_count", stats.producersCount, cluster, namespace); + writeMetric(stream, "pulsar_consumers_count", stats.consumersCount, cluster, namespace); + + writeMetric(stream, "pulsar_rate_in", stats.rateIn, cluster, namespace); + writeMetric(stream, "pulsar_rate_out", stats.rateOut, cluster, namespace); + writeMetric(stream, "pulsar_throughput_in", stats.throughputIn, cluster, namespace); + writeMetric(stream, "pulsar_throughput_out", stats.throughputOut, cluster, namespace); + writeMetric(stream, "pulsar_consumer_msg_ack_rate", stats.messageAckRate, cluster, namespace); + + writeMetric(stream, "pulsar_in_bytes_total", stats.bytesInCounter, cluster, namespace); + writeMetric(stream, "pulsar_in_messages_total", stats.msgInCounter, cluster, namespace); + writeMetric(stream, "pulsar_out_bytes_total", stats.bytesOutCounter, cluster, namespace); + writeMetric(stream, "pulsar_out_messages_total", stats.msgOutCounter, cluster, namespace); + + writeMetric(stream, "pulsar_storage_size", stats.managedLedgerStats.storageSize, cluster, + namespace); + writeMetric(stream, "pulsar_storage_logical_size", + stats.managedLedgerStats.storageLogicalSize, cluster, namespace); + writeMetric(stream, "pulsar_storage_backlog_size", stats.managedLedgerStats.backlogSize, cluster, + namespace); + writeMetric(stream, "pulsar_storage_offloaded_size", + stats.managedLedgerStats.offloadedStorageUsed, cluster, namespace); + + writeMetric(stream, "pulsar_storage_write_rate", stats.managedLedgerStats.storageWriteRate, + cluster, namespace); + writeMetric(stream, "pulsar_storage_read_rate", stats.managedLedgerStats.storageReadRate, + cluster, namespace); + + writeMetric(stream, "pulsar_subscription_delayed", stats.msgDelayed, cluster, namespace); + + writePulsarMsgBacklog(stream, stats.msgBacklog, cluster, namespace); stats.managedLedgerStats.storageWriteLatencyBuckets.refresh(); long[] latencyBuckets = stats.managedLedgerStats.storageWriteLatencyBuckets.getBuckets(); - metric(stream, cluster, namespace, "pulsar_storage_write_latency_le_0_5", latencyBuckets[0]); - metric(stream, cluster, namespace, "pulsar_storage_write_latency_le_1", latencyBuckets[1]); - metric(stream, cluster, namespace, "pulsar_storage_write_latency_le_5", latencyBuckets[2]); - metric(stream, cluster, namespace, "pulsar_storage_write_latency_le_10", latencyBuckets[3]); - metric(stream, cluster, namespace, "pulsar_storage_write_latency_le_20", latencyBuckets[4]); - metric(stream, cluster, namespace, "pulsar_storage_write_latency_le_50", latencyBuckets[5]); - metric(stream, cluster, namespace, "pulsar_storage_write_latency_le_100", latencyBuckets[6]); - metric(stream, cluster, namespace, "pulsar_storage_write_latency_le_200", latencyBuckets[7]); - metric(stream, cluster, namespace, "pulsar_storage_write_latency_le_1000", latencyBuckets[8]); - metric(stream, cluster, namespace, "pulsar_storage_write_latency_overflow", latencyBuckets[9]); - metric(stream, cluster, namespace, "pulsar_storage_write_latency_count", - stats.managedLedgerStats.storageWriteLatencyBuckets.getCount()); - metric(stream, cluster, namespace, "pulsar_storage_write_latency_sum", - stats.managedLedgerStats.storageWriteLatencyBuckets.getSum()); + writeMetric(stream, "pulsar_storage_write_latency_le_0_5", latencyBuckets[0], cluster, namespace); + writeMetric(stream, "pulsar_storage_write_latency_le_1", latencyBuckets[1], cluster, namespace); + writeMetric(stream, "pulsar_storage_write_latency_le_5", latencyBuckets[2], cluster, namespace); + writeMetric(stream, "pulsar_storage_write_latency_le_10", latencyBuckets[3], cluster, namespace); + writeMetric(stream, "pulsar_storage_write_latency_le_20", latencyBuckets[4], cluster, namespace); + writeMetric(stream, "pulsar_storage_write_latency_le_50", latencyBuckets[5], cluster, namespace); + writeMetric(stream, "pulsar_storage_write_latency_le_100", latencyBuckets[6], cluster, namespace); + writeMetric(stream, "pulsar_storage_write_latency_le_200", latencyBuckets[7], cluster, namespace); + writeMetric(stream, "pulsar_storage_write_latency_le_1000", latencyBuckets[8], cluster, namespace); + writeMetric(stream, "pulsar_storage_write_latency_overflow", latencyBuckets[9], cluster, namespace); + writeMetric(stream, "pulsar_storage_write_latency_count", + stats.managedLedgerStats.storageWriteLatencyBuckets.getCount(), cluster, namespace); + writeMetric(stream, "pulsar_storage_write_latency_sum", + stats.managedLedgerStats.storageWriteLatencyBuckets.getSum(), cluster, namespace); stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.refresh(); - long[] ledgerWritelatencyBuckets = stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets(); - metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_le_0_5", ledgerWritelatencyBuckets[0]); - metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_le_1", ledgerWritelatencyBuckets[1]); - metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_le_5", ledgerWritelatencyBuckets[2]); - metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_le_10", ledgerWritelatencyBuckets[3]); - metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_le_20", ledgerWritelatencyBuckets[4]); - metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_le_50", ledgerWritelatencyBuckets[5]); - metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_le_100", ledgerWritelatencyBuckets[6]); - metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_le_200", ledgerWritelatencyBuckets[7]); - metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_le_1000", ledgerWritelatencyBuckets[8]); - metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_overflow", - ledgerWritelatencyBuckets[9]); - metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_count", - stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getCount()); - metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_sum", - stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getSum()); + long[] ledgerWriteLatencyBuckets = stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets(); + writeMetric(stream, "pulsar_storage_ledger_write_latency_le_0_5", ledgerWriteLatencyBuckets[0], + cluster, namespace); + writeMetric(stream, "pulsar_storage_ledger_write_latency_le_1", ledgerWriteLatencyBuckets[1], + cluster, namespace); + writeMetric(stream, "pulsar_storage_ledger_write_latency_le_5", ledgerWriteLatencyBuckets[2], + cluster, namespace); + writeMetric(stream, "pulsar_storage_ledger_write_latency_le_10", ledgerWriteLatencyBuckets[3], + cluster, namespace); + writeMetric(stream, "pulsar_storage_ledger_write_latency_le_20", ledgerWriteLatencyBuckets[4], + cluster, namespace); + writeMetric(stream, "pulsar_storage_ledger_write_latency_le_50", ledgerWriteLatencyBuckets[5], + cluster, namespace); + writeMetric(stream, "pulsar_storage_ledger_write_latency_le_100", ledgerWriteLatencyBuckets[6], + cluster, namespace); + writeMetric(stream, "pulsar_storage_ledger_write_latency_le_200", ledgerWriteLatencyBuckets[7], + cluster, namespace); + writeMetric(stream, "pulsar_storage_ledger_write_latency_le_1000", ledgerWriteLatencyBuckets[8], + cluster, namespace); + writeMetric(stream, "pulsar_storage_ledger_write_latency_overflow", ledgerWriteLatencyBuckets[9], + cluster, namespace); + writeMetric(stream, "pulsar_storage_ledger_write_latency_count", + stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getCount(), cluster, namespace); + writeMetric(stream, "pulsar_storage_ledger_write_latency_sum", + stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getSum(), cluster, namespace); stats.managedLedgerStats.entrySizeBuckets.refresh(); long[] entrySizeBuckets = stats.managedLedgerStats.entrySizeBuckets.getBuckets(); - metric(stream, cluster, namespace, "pulsar_entry_size_le_128", entrySizeBuckets[0]); - metric(stream, cluster, namespace, "pulsar_entry_size_le_512", entrySizeBuckets[1]); - metric(stream, cluster, namespace, "pulsar_entry_size_le_1_kb", entrySizeBuckets[2]); - metric(stream, cluster, namespace, "pulsar_entry_size_le_2_kb", entrySizeBuckets[3]); - metric(stream, cluster, namespace, "pulsar_entry_size_le_4_kb", entrySizeBuckets[4]); - metric(stream, cluster, namespace, "pulsar_entry_size_le_16_kb", entrySizeBuckets[5]); - metric(stream, cluster, namespace, "pulsar_entry_size_le_100_kb", entrySizeBuckets[6]); - metric(stream, cluster, namespace, "pulsar_entry_size_le_1_mb", entrySizeBuckets[7]); - metric(stream, cluster, namespace, "pulsar_entry_size_le_overflow", entrySizeBuckets[8]); - metric(stream, cluster, namespace, "pulsar_entry_size_count", - stats.managedLedgerStats.entrySizeBuckets.getCount()); - metric(stream, cluster, namespace, "pulsar_entry_size_sum", - stats.managedLedgerStats.entrySizeBuckets.getSum()); - - if (!stats.replicationStats.isEmpty()) { - stats.replicationStats.forEach((remoteCluster, replStats) -> { - metricWithRemoteCluster(stream, cluster, namespace, "pulsar_replication_rate_in", remoteCluster, - replStats.msgRateIn); - metricWithRemoteCluster(stream, cluster, namespace, "pulsar_replication_rate_out", remoteCluster, - replStats.msgRateOut); - metricWithRemoteCluster(stream, cluster, namespace, "pulsar_replication_throughput_in", remoteCluster, - replStats.msgThroughputIn); - metricWithRemoteCluster(stream, cluster, namespace, "pulsar_replication_throughput_out", remoteCluster, - replStats.msgThroughputOut); - metricWithRemoteCluster(stream, cluster, namespace, "pulsar_replication_backlog", remoteCluster, - replStats.replicationBacklog); - metricWithRemoteCluster(stream, cluster, namespace, "pulsar_replication_connected_count", remoteCluster, - replStats.connectedCount); - metricWithRemoteCluster(stream, cluster, namespace, "pulsar_replication_rate_expired", remoteCluster, - replStats.msgRateExpired); - metricWithRemoteCluster(stream, cluster, namespace, "pulsar_replication_delay_in_seconds", - remoteCluster, replStats.replicationDelayInSeconds); - }); - } + writeMetric(stream, "pulsar_entry_size_le_128", entrySizeBuckets[0], cluster, namespace); + writeMetric(stream, "pulsar_entry_size_le_512", entrySizeBuckets[1], cluster, namespace); + writeMetric(stream, "pulsar_entry_size_le_1_kb", entrySizeBuckets[2], cluster, namespace); + writeMetric(stream, "pulsar_entry_size_le_2_kb", entrySizeBuckets[3], cluster, namespace); + writeMetric(stream, "pulsar_entry_size_le_4_kb", entrySizeBuckets[4], cluster, namespace); + writeMetric(stream, "pulsar_entry_size_le_16_kb", entrySizeBuckets[5], cluster, namespace); + writeMetric(stream, "pulsar_entry_size_le_100_kb", entrySizeBuckets[6], cluster, namespace); + writeMetric(stream, "pulsar_entry_size_le_1_mb", entrySizeBuckets[7], cluster, namespace); + writeMetric(stream, "pulsar_entry_size_le_overflow", entrySizeBuckets[8], cluster, namespace); + writeMetric(stream, "pulsar_entry_size_count", stats.managedLedgerStats.entrySizeBuckets.getCount(), + cluster, namespace); + writeMetric(stream, "pulsar_entry_size_sum", stats.managedLedgerStats.entrySizeBuckets.getSum(), + cluster, namespace); + + writeReplicationStat(stream, "pulsar_replication_rate_in", stats, + replStats -> replStats.msgRateIn, cluster, namespace); + writeReplicationStat(stream, "pulsar_replication_rate_out", stats, + replStats -> replStats.msgRateOut, cluster, namespace); + writeReplicationStat(stream, "pulsar_replication_throughput_in", stats, + replStats -> replStats.msgThroughputIn, cluster, namespace); + writeReplicationStat(stream, "pulsar_replication_throughput_out", stats, + replStats -> replStats.msgThroughputOut, cluster, namespace); + writeReplicationStat(stream, "pulsar_replication_backlog", stats, + replStats -> replStats.replicationBacklog, cluster, namespace); + writeReplicationStat(stream, "pulsar_replication_connected_count", stats, + replStats -> replStats.connectedCount, cluster, namespace); + writeReplicationStat(stream, "pulsar_replication_rate_expired", stats, + replStats -> replStats.msgRateExpired, cluster, namespace); + writeReplicationStat(stream, "pulsar_replication_delay_in_seconds", stats, + replStats -> replStats.replicationDelayInSeconds, cluster, namespace); } - private static void metric(SimpleTextOutputStream stream, String cluster, String name, - long value) { - TopicStats.metricType(stream, name); - stream.write(name) - .write("{cluster=\"").write(cluster).write("\"} ") - .write(value).write(' ').write(System.currentTimeMillis()) - .write('\n'); + private static void writePulsarMsgBacklog(PrometheusMetricStreams stream, Number value, + String cluster, String namespace) { + stream.writeSample("pulsar_msg_backlog", value, "cluster", cluster, "namespace", namespace, + "remote_cluster", + "local"); } - private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String name, - long value) { - TopicStats.metricType(stream, name); - stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace).write("\"} "); - stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n'); + private static void writeMetric(PrometheusMetricStreams stream, String metricName, Number value, + String cluster) { + stream.writeSample(metricName, value, "cluster", cluster); } - private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String name, - double value) { - TopicStats.metricType(stream, name); - stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace).write("\"} "); - stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n'); + private static void writeMetric(PrometheusMetricStreams stream, String metricName, Number value, String cluster, + String namespace) { + stream.writeSample(metricName, value, "cluster", cluster, "namespace", namespace); } - private static void metricWithRemoteCluster(SimpleTextOutputStream stream, String cluster, String namespace, - String name, String remoteCluster, double value) { - TopicStats.metricType(stream, name); - stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace); - stream.write("\",remote_cluster=\"").write(remoteCluster).write("\"} "); - stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n'); + private static void writeReplicationStat(PrometheusMetricStreams stream, String metricName, + AggregatedNamespaceStats namespaceStats, + Function sampleValueFunction, + String cluster, String namespace) { + if (!namespaceStats.replicationStats.isEmpty()) { + namespaceStats.replicationStats.forEach((remoteCluster, replStats) -> + stream.writeSample(metricName, sampleValueFunction.apply(replStats), + "cluster", cluster, + "namespace", namespace, + "remote_cluster", remoteCluster) + ); + } } + + } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricStreams.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricStreams.java new file mode 100644 index 0000000000000..6b6b972c175f0 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricStreams.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.stats.prometheus; + +import java.util.HashMap; +import java.util.Map; +import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; +import org.apache.pulsar.common.util.SimpleTextOutputStream; + +/** + * Helper class to ensure that metrics of the same name are grouped together under the same TYPE header when written. + * Those are the requirements of the + * Prometheus Exposition Format. + */ +public class PrometheusMetricStreams { + private final Map metricStreamMap = new HashMap<>(); + + /** + * Write the given metric and sample value to the stream. Will write #TYPE header if metric not seen before. + * @param metricName name of the metric. + * @param value value of the sample + * @param labelsAndValuesArray varargs of label and label value + */ + void writeSample(String metricName, Number value, String... labelsAndValuesArray) { + SimpleTextOutputStream stream = initGaugeType(metricName); + stream.write(metricName).write('{'); + for (int i = 0; i < labelsAndValuesArray.length; i += 2) { + stream.write(labelsAndValuesArray[i]).write("=\"").write(labelsAndValuesArray[i + 1]).write('\"'); + if (i + 2 != labelsAndValuesArray.length) { + stream.write(','); + } + } + stream.write("} ").write(value).write(' ').write(System.currentTimeMillis()).write('\n'); + } + + /** + * Flush all the stored metrics to the supplied stream. + * @param stream the stream to write to. + */ + void flushAllToStream(SimpleTextOutputStream stream) { + metricStreamMap.values().forEach(s -> stream.write(s.getBuffer())); + } + + /** + * Release all the streams to clean up resources. + */ + void releaseAll() { + metricStreamMap.values().forEach(s -> s.getBuffer().release()); + metricStreamMap.clear(); + } + + private SimpleTextOutputStream initGaugeType(String metricName) { + return metricStreamMap.computeIfAbsent(metricName, s -> { + SimpleTextOutputStream stream = new SimpleTextOutputStream(PulsarByteBufAllocator.DEFAULT.directBuffer()); + stream.write("# TYPE ").write(metricName).write(" gauge\n"); + return stream; + }); + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java index cd6afd1535dec..a993d1edf3a3d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java @@ -52,7 +52,8 @@ /** * Generate metrics aggregated at the namespace level and optionally at a topic level and formats them out * in a text format suitable to be consumed by Prometheus. - * Format specification can be found at {@link https://prometheus.io/docs/instrumenting/exposition_formats/} + * Format specification can be found at Exposition Formats */ public class PrometheusMetricsGenerator { @@ -86,38 +87,44 @@ public double get() { } public static void generate(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics, - boolean includeProducerMetrics, OutputStream out) throws IOException { + boolean includeProducerMetrics, OutputStream out) throws IOException { generate(pulsar, includeTopicMetrics, includeConsumerMetrics, includeProducerMetrics, false, out, null); } public static void generate(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics, - boolean includeProducerMetrics, boolean splitTopicAndPartitionIndexLabel, - OutputStream out) throws IOException { + boolean includeProducerMetrics, boolean splitTopicAndPartitionIndexLabel, + OutputStream out) throws IOException { generate(pulsar, includeTopicMetrics, includeConsumerMetrics, includeProducerMetrics, splitTopicAndPartitionIndexLabel, out, null); } public static void generate(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics, - boolean includeProducerMetrics, boolean splitTopicAndPartitionIndexLabel, OutputStream out, - List metricsProviders) - throws IOException { + boolean includeProducerMetrics, boolean splitTopicAndPartitionIndexLabel, + OutputStream out, + List metricsProviders) + throws IOException { ByteBuf buf = ByteBufAllocator.DEFAULT.heapBuffer(); + boolean exceptionHappens = false; + //Used in namespace/topic and transaction aggregators as share metric names + PrometheusMetricStreams metricStreams = new PrometheusMetricStreams(); try { SimpleTextOutputStream stream = new SimpleTextOutputStream(buf); generateSystemMetrics(stream, pulsar.getConfiguration().getClusterName()); NamespaceStatsAggregator.generate(pulsar, includeTopicMetrics, includeConsumerMetrics, - includeProducerMetrics, splitTopicAndPartitionIndexLabel, stream); + includeProducerMetrics, splitTopicAndPartitionIndexLabel, metricStreams); if (pulsar.getWorkerServiceOpt().isPresent()) { pulsar.getWorkerService().generateFunctionsStats(stream); } if (pulsar.getConfiguration().isTransactionCoordinatorEnabled()) { - TransactionAggregator.generate(pulsar, stream, includeTopicMetrics); + TransactionAggregator.generate(pulsar, metricStreams, includeTopicMetrics); } + metricStreams.flushAllToStream(stream); + generateBrokerBasicMetrics(pulsar, stream); generateManagedLedgerBookieClientMetrics(pulsar, stream); @@ -129,7 +136,12 @@ public static void generate(PulsarService pulsar, boolean includeTopicMetrics, b } out.write(buf.array(), buf.arrayOffset(), buf.readableBytes()); } finally { - buf.release(); + //release all the metrics buffers + metricStreams.releaseAll(); + //if exception happens, release buffer + if (exceptionHappens) { + buf.release(); + } } } @@ -142,17 +154,17 @@ private static void generateBrokerBasicMetrics(PulsarService pulsar, SimpleTextO if (pulsar.getConfiguration().isExposeManagedLedgerMetricsInPrometheus()) { // generate managedLedger metrics parseMetricsToPrometheusMetrics(new ManagedLedgerMetrics(pulsar).generate(), - clusterName, Collector.Type.GAUGE, stream); + clusterName, Collector.Type.GAUGE, stream); } if (pulsar.getConfiguration().isExposeManagedCursorMetricsInPrometheus()) { // generate managedCursor metrics parseMetricsToPrometheusMetrics(new ManagedCursorMetrics(pulsar).generate(), - clusterName, Collector.Type.GAUGE, stream); + clusterName, Collector.Type.GAUGE, stream); } parseMetricsToPrometheusMetrics(Collections.singletonList(pulsar.getBrokerService() - .getPulsarStats().getBrokerOperabilityMetrics().generateConnectionMetrics()), + .getPulsarStats().getBrokerOperabilityMetrics().generateConnectionMetrics()), clusterName, Collector.Type.GAUGE, stream); // generate loadBalance metrics @@ -267,17 +279,17 @@ private static void generateSystemMetrics(SimpleTextOutputStream stream, String static String getTypeStr(Collector.Type type) { switch (type) { - case COUNTER: - return "counter"; - case GAUGE: - return "gauge"; - case SUMMARY : - return "summary"; - case HISTOGRAM: - return "histogram"; - case UNTYPED: - default: - return "untyped"; + case COUNTER: + return "counter"; + case GAUGE: + return "gauge"; + case SUMMARY: + return "summary"; + case HISTOGRAM: + return "histogram"; + case UNTYPED: + default: + return "untyped"; } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java index e6e5883847df2..e91521aff5511 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java @@ -23,12 +23,12 @@ import java.util.Map; import java.util.Optional; import org.apache.bookkeeper.mledger.util.StatsBuckets; -import org.apache.pulsar.common.util.SimpleTextOutputStream; +import org.apache.commons.lang3.ArrayUtils; +import org.apache.pulsar.broker.service.Consumer; import org.apache.pulsar.compaction.CompactionRecord; import org.apache.pulsar.compaction.CompactorMXBean; class TopicStats { - int subscriptionsCount; int producersCount; int consumersCount; @@ -43,7 +43,6 @@ class TopicStats { double averageMsgSize; public long msgBacklog; - long publishRateLimitedTimes; long backlogQuotaLimit; @@ -55,9 +54,6 @@ class TopicStats { Map subscriptionStats = new HashMap<>(); Map producerStats = new HashMap<>(); - // Used for tracking duplicate TYPE definitions - static Map metricWithTypeDefinition = new HashMap<>(); - // For compaction long compactionRemovedEventCount; long compactionSucceedCount; @@ -103,378 +99,340 @@ public void reset() { compactionLatencyBuckets.reset(); } - static void resetTypes() { - metricWithTypeDefinition.clear(); - } - - static void printTopicStats(SimpleTextOutputStream stream, String cluster, String namespace, String topic, - TopicStats stats, Optional compactorMXBean, - boolean splitTopicAndPartitionIndexLabel) { - metric(stream, cluster, namespace, topic, "pulsar_subscriptions_count", stats.subscriptionsCount, - splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, "pulsar_producers_count", stats.producersCount, - splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, "pulsar_consumers_count", stats.consumersCount, - splitTopicAndPartitionIndexLabel); - - metric(stream, cluster, namespace, topic, "pulsar_rate_in", stats.rateIn, - splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, "pulsar_rate_out", stats.rateOut, - splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, "pulsar_throughput_in", stats.throughputIn, - splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, "pulsar_throughput_out", stats.throughputOut, - splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, "pulsar_average_msg_size", stats.averageMsgSize, - splitTopicAndPartitionIndexLabel); - - metric(stream, cluster, namespace, topic, "pulsar_storage_size", stats.managedLedgerStats.storageSize, - splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, "pulsar_storage_logical_size", - stats.managedLedgerStats.storageLogicalSize, splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, "pulsar_msg_backlog", stats.msgBacklog, - splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, "pulsar_storage_write_rate", - stats.managedLedgerStats.storageWriteRate, splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, "pulsar_storage_read_rate", stats.managedLedgerStats.storageReadRate, - splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, "pulsar_storage_backlog_size", - stats.managedLedgerStats.backlogSize, splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, "pulsar_publish_rate_limit_times", stats.publishRateLimitedTimes, - splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, "pulsar_storage_offloaded_size", stats.managedLedgerStats - .offloadedStorageUsed, splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, "pulsar_storage_backlog_quota_limit", stats.backlogQuotaLimit, - splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, "pulsar_storage_backlog_quota_limit_time", - stats.backlogQuotaLimitTime, splitTopicAndPartitionIndexLabel); + public static void printTopicStats(PrometheusMetricStreams stream, TopicStats stats, + Optional compactorMXBean, String cluster, String namespace, + String topic, boolean splitTopicAndPartitionIndexLabel) { + writeMetric(stream, "pulsar_subscriptions_count", stats.subscriptionsCount, + cluster, namespace, topic, splitTopicAndPartitionIndexLabel); + writeMetric(stream, "pulsar_producers_count", stats.producersCount, + cluster, namespace, topic, splitTopicAndPartitionIndexLabel); + writeMetric(stream, "pulsar_consumers_count", stats.consumersCount, + cluster, namespace, topic, splitTopicAndPartitionIndexLabel); + + writeMetric(stream, "pulsar_rate_in", stats.rateIn, + cluster, namespace, topic, splitTopicAndPartitionIndexLabel); + writeMetric(stream, "pulsar_rate_out", stats.rateOut, + cluster, namespace, topic, splitTopicAndPartitionIndexLabel); + writeMetric(stream, "pulsar_throughput_in", stats.throughputIn, + cluster, namespace, topic, splitTopicAndPartitionIndexLabel); + writeMetric(stream, "pulsar_throughput_out", stats.throughputOut, + cluster, namespace, topic, splitTopicAndPartitionIndexLabel); + writeMetric(stream, "pulsar_average_msg_size", stats.averageMsgSize, + cluster, namespace, topic, splitTopicAndPartitionIndexLabel); + + writeMetric(stream, "pulsar_storage_size", stats.managedLedgerStats.storageSize, + cluster, namespace, topic, splitTopicAndPartitionIndexLabel); + writeMetric(stream, "pulsar_storage_logical_size", + stats.managedLedgerStats.storageLogicalSize, cluster, namespace, topic, + splitTopicAndPartitionIndexLabel); + writeMetric(stream, "pulsar_msg_backlog", stats.msgBacklog, + cluster, namespace, topic, splitTopicAndPartitionIndexLabel); + writeMetric(stream, "pulsar_storage_write_rate", stats.managedLedgerStats.storageWriteRate, + cluster, namespace, topic, splitTopicAndPartitionIndexLabel); + writeMetric(stream, "pulsar_storage_read_rate", stats.managedLedgerStats.storageReadRate, + cluster, namespace, topic, splitTopicAndPartitionIndexLabel); + writeMetric(stream, "pulsar_storage_backlog_size", stats.managedLedgerStats.backlogSize, + cluster, namespace, topic, splitTopicAndPartitionIndexLabel); + writeMetric(stream, "pulsar_publish_rate_limit_times", stats.publishRateLimitedTimes, + cluster, namespace, topic, splitTopicAndPartitionIndexLabel); + writeMetric(stream, "pulsar_storage_offloaded_size", stats.managedLedgerStats + .offloadedStorageUsed, cluster, namespace, topic, splitTopicAndPartitionIndexLabel); + writeMetric(stream, "pulsar_storage_backlog_quota_limit", stats.backlogQuotaLimit, + cluster, namespace, topic, splitTopicAndPartitionIndexLabel); + writeMetric(stream, "pulsar_storage_backlog_quota_limit_time", stats.backlogQuotaLimitTime, + cluster, namespace, topic, splitTopicAndPartitionIndexLabel); long[] latencyBuckets = stats.managedLedgerStats.storageWriteLatencyBuckets.getBuckets(); - metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_0_5", latencyBuckets[0], - splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_1", latencyBuckets[1], + writeMetric(stream, "pulsar_storage_write_latency_le_0_5", + latencyBuckets[0], cluster, namespace, topic, splitTopicAndPartitionIndexLabel); + writeMetric(stream, "pulsar_storage_write_latency_le_1", + latencyBuckets[1], cluster, namespace, topic, splitTopicAndPartitionIndexLabel); + writeMetric(stream, "pulsar_storage_write_latency_le_5", + latencyBuckets[2], cluster, namespace, topic, splitTopicAndPartitionIndexLabel); + writeMetric(stream, "pulsar_storage_write_latency_le_10", + latencyBuckets[3], cluster, namespace, topic, splitTopicAndPartitionIndexLabel); + writeMetric(stream, "pulsar_storage_write_latency_le_20", + latencyBuckets[4], cluster, namespace, topic, splitTopicAndPartitionIndexLabel); + writeMetric(stream, "pulsar_storage_write_latency_le_50", + latencyBuckets[5], cluster, namespace, topic, splitTopicAndPartitionIndexLabel); + writeMetric(stream, "pulsar_storage_write_latency_le_100", + latencyBuckets[6], cluster, namespace, topic, splitTopicAndPartitionIndexLabel); + writeMetric(stream, "pulsar_storage_write_latency_le_200", + latencyBuckets[7], cluster, namespace, topic, splitTopicAndPartitionIndexLabel); + writeMetric(stream, "pulsar_storage_write_latency_le_1000", + latencyBuckets[8], cluster, namespace, topic, splitTopicAndPartitionIndexLabel); + writeMetric(stream, "pulsar_storage_write_latency_overflow", + latencyBuckets[9], cluster, namespace, topic, splitTopicAndPartitionIndexLabel); + writeMetric(stream, "pulsar_storage_write_latency_count", + stats.managedLedgerStats.storageWriteLatencyBuckets.getCount(), + cluster, namespace, topic, splitTopicAndPartitionIndexLabel); + writeMetric(stream, "pulsar_storage_write_latency_sum", + stats.managedLedgerStats.storageWriteLatencyBuckets.getSum(), cluster, namespace, topic, splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_5", latencyBuckets[2], - splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_10", latencyBuckets[3], - splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_20", latencyBuckets[4], - splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_50", latencyBuckets[5], - splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_100", latencyBuckets[6], - splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_200", latencyBuckets[7], - splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_1000", latencyBuckets[8], - splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_overflow", latencyBuckets[9], - splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_count", - stats.managedLedgerStats.storageWriteLatencyBuckets.getCount(), splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_sum", - stats.managedLedgerStats.storageWriteLatencyBuckets.getSum(), splitTopicAndPartitionIndexLabel); long[] ledgerWriteLatencyBuckets = stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets(); - metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_0_5", - ledgerWriteLatencyBuckets[0], splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_1", - ledgerWriteLatencyBuckets[1], splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_5", - ledgerWriteLatencyBuckets[2], splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_10", - ledgerWriteLatencyBuckets[3], splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_20", - ledgerWriteLatencyBuckets[4], splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_50", - ledgerWriteLatencyBuckets[5], splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_100", - ledgerWriteLatencyBuckets[6], splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_200", - ledgerWriteLatencyBuckets[7], splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_1000", - ledgerWriteLatencyBuckets[8], splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_overflow", - ledgerWriteLatencyBuckets[9], splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_count", + writeMetric(stream, "pulsar_storage_ledger_write_latency_le_0_5", + ledgerWriteLatencyBuckets[0], cluster, namespace, topic, splitTopicAndPartitionIndexLabel); + writeMetric(stream, "pulsar_storage_ledger_write_latency_le_1", + ledgerWriteLatencyBuckets[1], cluster, namespace, topic, splitTopicAndPartitionIndexLabel); + writeMetric(stream, "pulsar_storage_ledger_write_latency_le_5", + ledgerWriteLatencyBuckets[2], cluster, namespace, topic, splitTopicAndPartitionIndexLabel); + writeMetric(stream, "pulsar_storage_ledger_write_latency_le_10", + ledgerWriteLatencyBuckets[3], cluster, namespace, topic, splitTopicAndPartitionIndexLabel); + writeMetric(stream, "pulsar_storage_ledger_write_latency_le_20", + ledgerWriteLatencyBuckets[4], cluster, namespace, topic, splitTopicAndPartitionIndexLabel); + writeMetric(stream, "pulsar_storage_ledger_write_latency_le_50", + ledgerWriteLatencyBuckets[5], cluster, namespace, topic, splitTopicAndPartitionIndexLabel); + writeMetric(stream, "pulsar_storage_ledger_write_latency_le_100", + ledgerWriteLatencyBuckets[6], cluster, namespace, topic, splitTopicAndPartitionIndexLabel); + writeMetric(stream, "pulsar_storage_ledger_write_latency_le_200", + ledgerWriteLatencyBuckets[7], cluster, namespace, topic, splitTopicAndPartitionIndexLabel); + writeMetric(stream, "pulsar_storage_ledger_write_latency_le_1000", + ledgerWriteLatencyBuckets[8], cluster, namespace, topic, splitTopicAndPartitionIndexLabel); + writeMetric(stream, "pulsar_storage_ledger_write_latency_overflow", + ledgerWriteLatencyBuckets[9], cluster, namespace, topic, splitTopicAndPartitionIndexLabel); + writeMetric(stream, "pulsar_storage_ledger_write_latency_count", stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getCount(), - splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_sum", + cluster, namespace, topic, splitTopicAndPartitionIndexLabel); + writeMetric(stream, "pulsar_storage_ledger_write_latency_sum", stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getSum(), - splitTopicAndPartitionIndexLabel); + cluster, namespace, topic, splitTopicAndPartitionIndexLabel); long[] entrySizeBuckets = stats.managedLedgerStats.entrySizeBuckets.getBuckets(); - metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_128", entrySizeBuckets[0], + writeMetric(stream, "pulsar_entry_size_le_128", entrySizeBuckets[0], cluster, namespace, topic, splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_512", entrySizeBuckets[1], + writeMetric(stream, "pulsar_entry_size_le_512", entrySizeBuckets[1], cluster, namespace, topic, splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_1_kb", entrySizeBuckets[2], + writeMetric(stream, "pulsar_entry_size_le_1_kb", entrySizeBuckets[2], cluster, namespace, topic, splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_2_kb", entrySizeBuckets[3], + writeMetric(stream, "pulsar_entry_size_le_2_kb", entrySizeBuckets[3], cluster, namespace, topic, splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_4_kb", entrySizeBuckets[4], + writeMetric(stream, "pulsar_entry_size_le_4_kb", entrySizeBuckets[4], cluster, namespace, topic, splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_16_kb", entrySizeBuckets[5], + writeMetric(stream, "pulsar_entry_size_le_16_kb", entrySizeBuckets[5], cluster, namespace, topic, splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_100_kb", entrySizeBuckets[6], + writeMetric(stream, "pulsar_entry_size_le_100_kb", entrySizeBuckets[6], cluster, namespace, topic, splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_1_mb", entrySizeBuckets[7], + writeMetric(stream, "pulsar_entry_size_le_1_mb", entrySizeBuckets[7], cluster, namespace, topic, splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_overflow", entrySizeBuckets[8], + writeMetric(stream, "pulsar_entry_size_le_overflow", entrySizeBuckets[8], cluster, namespace, topic, splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, "pulsar_entry_size_count", - stats.managedLedgerStats.entrySizeBuckets.getCount(), splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, "pulsar_entry_size_sum", - stats.managedLedgerStats.entrySizeBuckets.getSum(), splitTopicAndPartitionIndexLabel); + writeMetric(stream, "pulsar_entry_size_count", stats.managedLedgerStats.entrySizeBuckets.getCount(), + cluster, namespace, topic, splitTopicAndPartitionIndexLabel); + writeMetric(stream, "pulsar_entry_size_sum", stats.managedLedgerStats.entrySizeBuckets.getSum(), + cluster, namespace, topic, splitTopicAndPartitionIndexLabel); stats.producerStats.forEach((p, producerStats) -> { - metric(stream, cluster, namespace, topic, p, producerStats.producerId, "pulsar_producer_msg_rate_in", - producerStats.msgRateIn, splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, p, producerStats.producerId, "pulsar_producer_msg_throughput_in", - producerStats.msgThroughputIn, splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, p, producerStats.producerId, "pulsar_producer_msg_average_Size", - producerStats.averageMsgSize, splitTopicAndPartitionIndexLabel); + writeProducerMetric(stream, "pulsar_producer_msg_rate_in", producerStats.msgRateIn, + cluster, namespace, topic, p, producerStats.producerId, splitTopicAndPartitionIndexLabel); + writeProducerMetric(stream, "pulsar_producer_msg_throughput_in", producerStats.msgThroughputIn, + cluster, namespace, topic, p, producerStats.producerId, splitTopicAndPartitionIndexLabel); + writeProducerMetric(stream, "pulsar_producer_msg_average_Size", producerStats.averageMsgSize, + cluster, namespace, topic, p, producerStats.producerId, splitTopicAndPartitionIndexLabel); }); - stats.subscriptionStats.forEach((n, subsStats) -> { - metric(stream, cluster, namespace, topic, n, "pulsar_subscription_back_log", - subsStats.msgBacklog, splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, n, "pulsar_subscription_back_log_no_delayed", - subsStats.msgBacklogNoDelayed, splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, n, "pulsar_subscription_delayed", - subsStats.msgDelayed, splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_rate_redeliver", - subsStats.msgRateRedeliver, splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, n, "pulsar_subscription_unacked_messages", - subsStats.unackedMessages, splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, n, "pulsar_subscription_blocked_on_unacked_messages", - subsStats.blockedSubscriptionOnUnackedMsgs ? 1 : 0, splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_rate_out", - subsStats.msgRateOut, splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_ack_rate", - subsStats.messageAckRate, splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_throughput_out", - subsStats.msgThroughputOut, splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, n, "pulsar_out_bytes_total", - subsStats.bytesOutCounter, splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, n, "pulsar_out_messages_total", - subsStats.msgOutCounter, splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, n, "pulsar_subscription_last_expire_timestamp", - subsStats.lastExpireTimestamp, splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, n, "pulsar_subscription_last_acked_timestamp", - subsStats.lastAckedTimestamp, splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, n, "pulsar_subscription_last_consumed_flow_timestamp", - subsStats.lastConsumedFlowTimestamp, splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, n, "pulsar_subscription_last_consumed_timestamp", - subsStats.lastConsumedTimestamp, splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, n, "pulsar_subscription_last_mark_delete_advanced_timestamp", - subsStats.lastMarkDeleteAdvancedTimestamp, splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_rate_expired", - subsStats.msgRateExpired, splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, n, "pulsar_subscription_total_msg_expired", - subsStats.totalMsgExpired, splitTopicAndPartitionIndexLabel); + stats.subscriptionStats.forEach((sub, subsStats) -> { + writeSubscriptionMetric(stream, "pulsar_subscription_back_log", subsStats.msgBacklog, + cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel); + writeSubscriptionMetric(stream, "pulsar_subscription_back_log_no_delayed", + subsStats.msgBacklogNoDelayed, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel); + writeSubscriptionMetric(stream, "pulsar_subscription_delayed", + subsStats.msgDelayed, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel); + writeSubscriptionMetric(stream, "pulsar_subscription_msg_rate_redeliver", + subsStats.msgRateRedeliver, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel); + writeSubscriptionMetric(stream, "pulsar_subscription_unacked_messages", + subsStats.unackedMessages, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel); + writeSubscriptionMetric(stream, "pulsar_subscription_blocked_on_unacked_messages", + subsStats.blockedSubscriptionOnUnackedMsgs ? 1 : 0, cluster, namespace, topic, sub, + splitTopicAndPartitionIndexLabel); + writeSubscriptionMetric(stream, "pulsar_subscription_msg_rate_out", + subsStats.msgRateOut, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel); + writeSubscriptionMetric(stream, "pulsar_subscription_msg_ack_rate", + subsStats.messageAckRate, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel); + writeSubscriptionMetric(stream, "pulsar_subscription_msg_throughput_out", + subsStats.msgThroughputOut, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel); + writeSubscriptionMetric(stream, "pulsar_out_bytes_total", + subsStats.bytesOutCounter, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel); + writeSubscriptionMetric(stream, "pulsar_out_messages_total", + subsStats.msgOutCounter, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel); + writeSubscriptionMetric(stream, "pulsar_subscription_last_expire_timestamp", + subsStats.lastExpireTimestamp, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel); + writeSubscriptionMetric(stream, "pulsar_subscription_last_acked_timestamp", + subsStats.lastAckedTimestamp, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel); + writeSubscriptionMetric(stream, "pulsar_subscription_last_consumed_flow_timestamp", + subsStats.lastConsumedFlowTimestamp, cluster, namespace, topic, sub, + splitTopicAndPartitionIndexLabel); + writeSubscriptionMetric(stream, "pulsar_subscription_last_consumed_timestamp", + subsStats.lastConsumedTimestamp, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel); + writeSubscriptionMetric(stream, "pulsar_subscription_last_mark_delete_advanced_timestamp", + subsStats.lastMarkDeleteAdvancedTimestamp, cluster, namespace, topic, sub, + splitTopicAndPartitionIndexLabel); + writeSubscriptionMetric(stream, "pulsar_subscription_msg_rate_expired", + subsStats.msgRateExpired, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel); + writeSubscriptionMetric(stream, "pulsar_subscription_total_msg_expired", + subsStats.totalMsgExpired, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel); + subsStats.consumerStat.forEach((c, consumerStats) -> { - metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(), - "pulsar_consumer_msg_rate_redeliver", consumerStats.msgRateRedeliver, - splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(), - "pulsar_consumer_unacked_messages", consumerStats.unackedMessages, - splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(), - "pulsar_consumer_blocked_on_unacked_messages", + writeConsumerMetric(stream, "pulsar_consumer_msg_rate_redeliver", consumerStats.msgRateRedeliver, + cluster, namespace, topic, sub, c, splitTopicAndPartitionIndexLabel); + writeConsumerMetric(stream, "pulsar_consumer_unacked_messages", consumerStats.unackedMessages, + cluster, namespace, topic, sub, c, splitTopicAndPartitionIndexLabel); + writeConsumerMetric(stream, "pulsar_consumer_blocked_on_unacked_messages", consumerStats.blockedSubscriptionOnUnackedMsgs ? 1 : 0, - splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(), - "pulsar_consumer_msg_rate_out", consumerStats.msgRateOut, - splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(), - "pulsar_consumer_msg_ack_rate", consumerStats.msgAckRate, - splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(), - "pulsar_consumer_msg_throughput_out", consumerStats.msgThroughputOut, - splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(), - "pulsar_consumer_available_permits", consumerStats.availablePermits, - splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(), - "pulsar_out_bytes_total", consumerStats.bytesOutCounter, - splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(), - "pulsar_out_messages_total", consumerStats.msgOutCounter, - splitTopicAndPartitionIndexLabel); + cluster, namespace, topic, sub, c, splitTopicAndPartitionIndexLabel); + writeConsumerMetric(stream, "pulsar_consumer_msg_rate_out", consumerStats.msgRateOut, + cluster, namespace, topic, sub, c, splitTopicAndPartitionIndexLabel); + + writeConsumerMetric(stream, "pulsar_consumer_msg_ack_rate", consumerStats.msgAckRate, + cluster, namespace, topic, sub, c, splitTopicAndPartitionIndexLabel); + + writeConsumerMetric(stream, "pulsar_consumer_msg_throughput_out", consumerStats.msgThroughputOut, + cluster, namespace, topic, sub, c, splitTopicAndPartitionIndexLabel); + writeConsumerMetric(stream, "pulsar_consumer_available_permits", consumerStats.availablePermits, + cluster, namespace, topic, sub, c, splitTopicAndPartitionIndexLabel); + writeConsumerMetric(stream, "pulsar_out_bytes_total", consumerStats.bytesOutCounter, + cluster, namespace, topic, sub, c, splitTopicAndPartitionIndexLabel); + writeConsumerMetric(stream, "pulsar_out_messages_total", consumerStats.msgOutCounter, + cluster, namespace, topic, sub, c, splitTopicAndPartitionIndexLabel); }); }); if (!stats.replicationStats.isEmpty()) { stats.replicationStats.forEach((remoteCluster, replStats) -> { - metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_rate_in", remoteCluster, - replStats.msgRateIn, splitTopicAndPartitionIndexLabel); - metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_rate_out", remoteCluster, - replStats.msgRateOut, splitTopicAndPartitionIndexLabel); - metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_throughput_in", - remoteCluster, replStats.msgThroughputIn, splitTopicAndPartitionIndexLabel); - metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_throughput_out", - remoteCluster, replStats.msgThroughputOut, splitTopicAndPartitionIndexLabel); - metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_backlog", remoteCluster, - replStats.replicationBacklog, splitTopicAndPartitionIndexLabel); - metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_connected_count", - remoteCluster, replStats.connectedCount, splitTopicAndPartitionIndexLabel); - metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_rate_expired", - remoteCluster, replStats.msgRateExpired, splitTopicAndPartitionIndexLabel); - metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_delay_in_seconds", - remoteCluster, replStats.replicationDelayInSeconds, splitTopicAndPartitionIndexLabel); + writeMetric(stream, "pulsar_replication_rate_in", replStats.msgRateIn, + cluster, namespace, topic, remoteCluster, splitTopicAndPartitionIndexLabel); + writeMetric(stream, "pulsar_replication_rate_out", replStats.msgRateOut, + cluster, namespace, topic, remoteCluster, splitTopicAndPartitionIndexLabel); + writeMetric(stream, "pulsar_replication_throughput_in", replStats.msgThroughputIn, + cluster, namespace, topic, remoteCluster, splitTopicAndPartitionIndexLabel); + writeMetric(stream, "pulsar_replication_throughput_out", replStats.msgThroughputOut, + cluster, namespace, topic, remoteCluster, splitTopicAndPartitionIndexLabel); + writeMetric(stream, "pulsar_replication_backlog", replStats.replicationBacklog, + cluster, namespace, topic, remoteCluster, splitTopicAndPartitionIndexLabel); + writeMetric(stream, "pulsar_replication_connected_count", replStats.connectedCount, + cluster, namespace, topic, remoteCluster, splitTopicAndPartitionIndexLabel); + writeMetric(stream, "pulsar_replication_rate_expired", replStats.msgRateExpired, + cluster, namespace, topic, remoteCluster, splitTopicAndPartitionIndexLabel); + writeMetric(stream, "pulsar_replication_delay_in_seconds", replStats.replicationDelayInSeconds, + cluster, namespace, topic, remoteCluster, splitTopicAndPartitionIndexLabel); }); } - metric(stream, cluster, namespace, topic, "pulsar_in_bytes_total", stats.bytesInCounter, + writeMetric(stream, "pulsar_in_bytes_total", stats.bytesInCounter, cluster, namespace, topic, splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, "pulsar_in_messages_total", stats.msgInCounter, + writeMetric(stream, "pulsar_in_messages_total", stats.msgInCounter, cluster, namespace, topic, splitTopicAndPartitionIndexLabel); // Compaction boolean hasCompaction = compactorMXBean.flatMap(mxBean -> mxBean.getCompactionRecordForTopic(topic)) - .map(__ -> true).orElse(false); + .isPresent(); if (hasCompaction) { - metric(stream, cluster, namespace, topic, "pulsar_compaction_removed_event_count", - stats.compactionRemovedEventCount, splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, "pulsar_compaction_succeed_count", - stats.compactionSucceedCount, splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, "pulsar_compaction_failed_count", - stats.compactionFailedCount, splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, "pulsar_compaction_duration_time_in_mills", - stats.compactionDurationTimeInMills, splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, "pulsar_compaction_read_throughput", - stats.compactionReadThroughput, splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, "pulsar_compaction_write_throughput", - stats.compactionWriteThroughput, splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, "pulsar_compaction_compacted_entries_count", - stats.compactionCompactedEntriesCount, splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, "pulsar_compaction_compacted_entries_size", - stats.compactionCompactedEntriesSize, splitTopicAndPartitionIndexLabel); - long[] compactionLatencyBuckets = stats.compactionLatencyBuckets.getBuckets(); - metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_0_5", - compactionLatencyBuckets[0], splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_1", - compactionLatencyBuckets[1], splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_5", - compactionLatencyBuckets[2], splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_10", - compactionLatencyBuckets[3], splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_20", - compactionLatencyBuckets[4], splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_50", - compactionLatencyBuckets[5], splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_100", - compactionLatencyBuckets[6], splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_200", - compactionLatencyBuckets[7], splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_1000", - compactionLatencyBuckets[8], splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_overflow", - compactionLatencyBuckets[9], splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_sum", - stats.compactionLatencyBuckets.getSum(), splitTopicAndPartitionIndexLabel); - metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_count", - stats.compactionLatencyBuckets.getCount(), splitTopicAndPartitionIndexLabel); + writeMetric(stream, "pulsar_compaction_removed_event_count", + stats.compactionRemovedEventCount, cluster, namespace, topic, splitTopicAndPartitionIndexLabel); + writeMetric(stream, "pulsar_compaction_succeed_count", + stats.compactionSucceedCount, cluster, namespace, topic, splitTopicAndPartitionIndexLabel); + writeMetric(stream, "pulsar_compaction_failed_count", + stats.compactionFailedCount, cluster, namespace, topic, splitTopicAndPartitionIndexLabel); + writeMetric(stream, "pulsar_compaction_duration_time_in_mills", + stats.compactionDurationTimeInMills, cluster, namespace, topic, splitTopicAndPartitionIndexLabel); + writeMetric(stream, "pulsar_compaction_read_throughput", + stats.compactionReadThroughput, cluster, namespace, topic, splitTopicAndPartitionIndexLabel); + writeMetric(stream, "pulsar_compaction_write_throughput", + stats.compactionWriteThroughput, cluster, namespace, topic, splitTopicAndPartitionIndexLabel); + writeMetric(stream, "pulsar_compaction_compacted_entries_count", + stats.compactionCompactedEntriesCount, cluster, namespace, topic, splitTopicAndPartitionIndexLabel); + writeMetric(stream, "pulsar_compaction_compacted_entries_size", + stats.compactionCompactedEntriesSize, cluster, namespace, topic, splitTopicAndPartitionIndexLabel); + + long[] compactionBuckets = stats.compactionLatencyBuckets.getBuckets(); + writeMetric(stream, "pulsar_compaction_latency_le_0_5", + compactionBuckets[0], cluster, namespace, topic, splitTopicAndPartitionIndexLabel); + writeMetric(stream, "pulsar_compaction_latency_le_1", + compactionBuckets[1], cluster, namespace, topic, splitTopicAndPartitionIndexLabel); + writeMetric(stream, "pulsar_compaction_latency_le_5", + compactionBuckets[2], cluster, namespace, topic, splitTopicAndPartitionIndexLabel); + writeMetric(stream, "pulsar_compaction_latency_le_10", + compactionBuckets[3], cluster, namespace, topic, splitTopicAndPartitionIndexLabel); + writeMetric(stream, "pulsar_compaction_latency_le_20", + compactionBuckets[4], cluster, namespace, topic, splitTopicAndPartitionIndexLabel); + writeMetric(stream, "pulsar_compaction_latency_le_50", + compactionBuckets[5], cluster, namespace, topic, splitTopicAndPartitionIndexLabel); + writeMetric(stream, "pulsar_compaction_latency_le_100", + compactionBuckets[6], cluster, namespace, topic, splitTopicAndPartitionIndexLabel); + writeMetric(stream, "pulsar_compaction_latency_le_200", + compactionBuckets[7], cluster, namespace, topic, splitTopicAndPartitionIndexLabel); + writeMetric(stream, "pulsar_compaction_latency_le_1000", + compactionBuckets[8], cluster, namespace, topic, splitTopicAndPartitionIndexLabel); + writeMetric(stream, "pulsar_compaction_latency_overflow", + compactionBuckets[9], cluster, namespace, topic, splitTopicAndPartitionIndexLabel); + writeMetric(stream, "pulsar_compaction_latency_sum", + stats.compactionLatencyBuckets.getSum(), cluster, namespace, topic, + splitTopicAndPartitionIndexLabel); + writeMetric(stream, "pulsar_compaction_latency_count", + stats.compactionLatencyBuckets.getCount(), cluster, namespace, topic, + splitTopicAndPartitionIndexLabel); } } - static void metricType(SimpleTextOutputStream stream, String name) { - - if (!metricWithTypeDefinition.containsKey(name)) { - metricWithTypeDefinition.put(name, "gauge"); - stream.write("# TYPE ").write(name).write(" gauge\n"); - } - - } - - private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic, - String name, double value, boolean splitTopicAndPartitionIndexLabel) { - metricType(stream, name); - appendRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel).write("\"} "); - stream.write(value); - appendEndings(stream); + private static void writeMetric(PrometheusMetricStreams stream, String metricName, Number value, String cluster, + String namespace, String topic, boolean splitTopicAndPartitionIndexLabel) { + writeTopicMetric(stream, metricName, value, cluster, namespace, topic, splitTopicAndPartitionIndexLabel); } - private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic, - String subscription, String name, long value, boolean splitTopicAndPartitionIndexLabel) { - metricType(stream, name); - appendRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel) - .write("\",subscription=\"").write(subscription).write("\"} "); - stream.write(value); - appendEndings(stream); + private static void writeMetric(PrometheusMetricStreams stream, String metricName, Number value, String cluster, + String namespace, String topic, String remoteCluster, + boolean splitTopicAndPartitionIndexLabel) { + writeTopicMetric(stream, metricName, value, cluster, namespace, topic, splitTopicAndPartitionIndexLabel, + "remote_cluster", remoteCluster); } - private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic, - String producerName, long produceId, String name, double value, boolean splitTopicAndPartitionIndexLabel) { - metricType(stream, name); - appendRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel) - .write("\",producer_name=\"").write(producerName) - .write("\",producer_id=\"").write(produceId).write("\"} "); - stream.write(value); - appendEndings(stream); + private static void writeProducerMetric(PrometheusMetricStreams stream, String metricName, Number value, + String cluster, String namespace, String topic, String producer, + long producerId, boolean splitTopicAndPartitionIndexLabel) { + writeTopicMetric(stream, metricName, value, cluster, namespace, topic, splitTopicAndPartitionIndexLabel, + "producer_name", producer, "producer_id", String.valueOf(producerId)); } - private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic, - String subscription, String name, double value, boolean splitTopicAndPartitionIndexLabel) { - metricType(stream, name); - appendRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel) - .write("\",subscription=\"").write(subscription).write("\"} "); - stream.write(value); - appendEndings(stream); - } - - private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic, - String subscription, String consumerName, long consumerId, String name, long value, - boolean splitTopicAndPartitionIndexLabel) { - metricType(stream, name); - appendRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel) - .write("\",subscription=\"").write(subscription) - .write("\",consumer_name=\"").write(consumerName).write("\",consumer_id=\"").write(consumerId) - .write("\"} "); - stream.write(value); - appendEndings(stream); - } - private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic, - String subscription, String consumerName, long consumerId, String name, double value, - boolean splitTopicAndPartitionIndexLabel) { - metricType(stream, name); - appendRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel) - .write("\",subscription=\"").write(subscription) - .write("\",consumer_name=\"").write(consumerName).write("\",consumer_id=\"") - .write(consumerId).write("\"} "); - stream.write(value); - appendEndings(stream); + private static void writeSubscriptionMetric(PrometheusMetricStreams stream, String metricName, Number value, + String cluster, String namespace, String topic, String subscription, + boolean splitTopicAndPartitionIndexLabel) { + writeTopicMetric(stream, metricName, value, cluster, namespace, topic, splitTopicAndPartitionIndexLabel, + "subscription", subscription); } - private static void metricWithRemoteCluster(SimpleTextOutputStream stream, String cluster, String namespace, - String topic, String name, String remoteCluster, double value, boolean splitTopicAndPartitionIndexLabel) { - metricType(stream, name); - appendRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel) - .write("\",remote_cluster=\"").write(remoteCluster).write("\"} "); - stream.write(value); - appendEndings(stream); + private static void writeConsumerMetric(PrometheusMetricStreams stream, String metricName, Number value, + String cluster, String namespace, String topic, String subscription, + Consumer consumer, boolean splitTopicAndPartitionIndexLabel) { + writeTopicMetric(stream, metricName, value, cluster, namespace, topic, splitTopicAndPartitionIndexLabel, + "subscription", subscription, "consumer_name", consumer.consumerName(), + "consumer_id", String.valueOf(consumer.consumerId())); } - private static SimpleTextOutputStream appendRequiredLabels(SimpleTextOutputStream stream, String cluster, - String namespace, String topic, String name, boolean splitTopicAndPartitionIndexLabel) { - stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace); + static void writeTopicMetric(PrometheusMetricStreams stream, String metricName, Number value, String cluster, + String namespace, String topic, boolean splitTopicAndPartitionIndexLabel, + String... extraLabelsAndValues) { + String[] labelsAndValues = new String[splitTopicAndPartitionIndexLabel ? 8 : 6]; + labelsAndValues[0] = "cluster"; + labelsAndValues[1] = cluster; + labelsAndValues[2] = "namespace"; + labelsAndValues[3] = namespace; + labelsAndValues[4] = "topic"; if (splitTopicAndPartitionIndexLabel) { int index = topic.indexOf(PARTITIONED_TOPIC_SUFFIX); if (index > 0) { - stream.write("\",topic=\"").write(topic.substring(0, index)).write("\",partition=\"") - .write(topic.substring(index + PARTITIONED_TOPIC_SUFFIX.length())); + labelsAndValues[5] = topic.substring(0, index); + labelsAndValues[6] = "partition"; + labelsAndValues[7] = topic.substring(index + PARTITIONED_TOPIC_SUFFIX.length()); } else { - stream.write("\",topic=\"").write(topic).write("\",partition=\"").write("-1"); + labelsAndValues[5] = topic; + labelsAndValues[6] = "partition"; + labelsAndValues[7] = "-1"; } } else { - stream.write("\",topic=\"").write(topic); + labelsAndValues[5] = topic; } - return stream; - } - - private static void appendEndings(SimpleTextOutputStream stream) { - stream.write(' ').write(System.currentTimeMillis()).write('\n'); + String[] labels = ArrayUtils.addAll(labelsAndValues, extraLabelsAndValues); + stream.writeSample(metricName, value, labels); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java index e6ac1535f43c7..8c58b516333f5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java @@ -20,8 +20,6 @@ import static org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsEventsNames; import io.netty.util.concurrent.FastThreadLocal; -import java.util.HashMap; -import java.util.Map; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl; @@ -30,7 +28,6 @@ import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; -import org.apache.pulsar.common.util.SimpleTextOutputStream; import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl; import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore; import org.apache.pulsar.transaction.coordinator.impl.TransactionMetadataStoreStats; @@ -38,21 +35,10 @@ @Slf4j public class TransactionAggregator { - /** - * Used for tracking duplicate TYPE definitions. - */ - private static final FastThreadLocal> threadLocalMetricWithTypeDefinition = - new FastThreadLocal() { - @Override - protected Map initialValue() { - return new HashMap<>(); - } - }; - private static final FastThreadLocal localTransactionCoordinatorStats = new FastThreadLocal() { @Override - protected AggregatedTransactionCoordinatorStats initialValue() throws Exception { + protected AggregatedTransactionCoordinatorStats initialValue() { return new AggregatedTransactionCoordinatorStats(); } }; @@ -60,21 +46,18 @@ protected AggregatedTransactionCoordinatorStats initialValue() throws Exception private static final FastThreadLocal localManageLedgerStats = new FastThreadLocal() { @Override - protected ManagedLedgerStats initialValue() throws Exception { + protected ManagedLedgerStats initialValue() { return new ManagedLedgerStats(); } }; - public static void generate(PulsarService pulsar, SimpleTextOutputStream stream, boolean includeTopicMetrics) { + public static void generate(PulsarService pulsar, PrometheusMetricStreams stream, boolean includeTopicMetrics) { String cluster = pulsar.getConfiguration().getClusterName(); - Map metricWithTypeDefinition = threadLocalMetricWithTypeDefinition.get(); - metricWithTypeDefinition.clear(); if (includeTopicMetrics) { - pulsar.getBrokerService().getMultiLayerTopicMap().forEach((namespace, bundlesMap) -> { - bundlesMap.forEach((bundle, topicsMap) -> { - topicsMap.forEach((name, topic) -> { + pulsar.getBrokerService().getMultiLayerTopicMap().forEach((namespace, bundlesMap) -> + bundlesMap.forEach((bundle, topicsMap) -> topicsMap.forEach((name, topic) -> { if (topic instanceof PersistentTopic) { topic.getSubscriptions().values().forEach(subscription -> { try { @@ -82,9 +65,8 @@ public static void generate(PulsarService pulsar, SimpleTextOutputStream stream, if (!checkTopicIsEventsNames(TopicName.get(subscription.getTopic().getName())) && subscription instanceof PersistentSubscription && ((PersistentSubscription) subscription).checkIfPendingAckStoreInit()) { - ManagedLedger managedLedger = - ((PersistentSubscription) subscription) - .getPendingAckManageLedger().get(); + ManagedLedger managedLedger = ((PersistentSubscription) subscription) + .getPendingAckManageLedger().get(); generateManageLedgerStats(managedLedger, stream, cluster, namespace, name, subscription.getName()); } @@ -93,9 +75,7 @@ public static void generate(PulsarService pulsar, SimpleTextOutputStream stream, } }); } - }); - }); - }); + }))); } AggregatedTransactionCoordinatorStats transactionCoordinatorStats = localTransactionCoordinatorStats.get(); @@ -124,18 +104,18 @@ public static void generate(PulsarService pulsar, SimpleTextOutputStream stream, localManageLedgerStats.get().reset(); if (transactionMetadataStore instanceof MLTransactionMetadataStore) { - ManagedLedger managedLedger = - ((MLTransactionMetadataStore) transactionMetadataStore).getManagedLedger(); + ManagedLedger managedLedger = + ((MLTransactionMetadataStore) transactionMetadataStore).getManagedLedger(); generateManageLedgerStats(managedLedger, stream, cluster, NamespaceName.SYSTEM_NAMESPACE.toString(), MLTransactionLogImpl.TRANSACTION_LOG_PREFIX + transactionCoordinatorID.getId(), MLTransactionLogImpl.TRANSACTION_SUBSCRIPTION_NAME); } - }); + }); } - private static void generateManageLedgerStats(ManagedLedger managedLedger, SimpleTextOutputStream stream, + private static void generateManageLedgerStats(ManagedLedger managedLedger, PrometheusMetricStreams stream, String cluster, String namespace, String topic, String subscription) { ManagedLedgerStats managedLedgerStats = localManageLedgerStats.get(); ManagedLedgerMBeanImpl mlStats = (ManagedLedgerMBeanImpl) managedLedger.getStats(); @@ -157,174 +137,149 @@ private static void generateManageLedgerStats(ManagedLedger managedLedger, Simpl managedLedgerStats.storageWriteRate = mlStats.getAddEntryMessagesRate(); managedLedgerStats.storageReadRate = mlStats.getReadEntriesRate(); - printManageLedgerStats(stream, cluster, namespace, topic, - subscription, managedLedgerStats); - } - - private static void metricType(SimpleTextOutputStream stream, String name) { - Map metricWithTypeDefinition = threadLocalMetricWithTypeDefinition.get(); - if (!metricWithTypeDefinition.containsKey(name)) { - metricWithTypeDefinition.put(name, "gauge"); - stream.write("# TYPE ").write(name).write(" gauge\n"); - } - - } - - private static void metric(SimpleTextOutputStream stream, String cluster, String name, - double value, long coordinatorId) { - metricType(stream, name); - stream.write(name) - .write("{cluster=\"").write(cluster) - .write("\",coordinator_id=\"").write(coordinatorId).write("\"} ") - .write(value).write(' ').write(System.currentTimeMillis()) - .write('\n'); + printManageLedgerStats(stream, cluster, namespace, topic, subscription, managedLedgerStats); } - private static void metrics(SimpleTextOutputStream stream, String cluster, String namespace, - String topic, String subscription, String name, long value) { - stream.write(name).write("{cluster=\"").write(cluster).write("\", namespace=\"").write(namespace) - .write("\",topic=\"").write(topic).write("\",subscription=\"").write(subscription).write("\"} "); - stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n'); - } - - private static void metrics(SimpleTextOutputStream stream, String cluster, String namespace, - String topic, String subscription, String name, double value) { - stream.write(name).write("{cluster=\"").write(cluster).write("\", namespace=\"").write(namespace) - .write("\",topic=\"").write(topic).write("\",subscription=\"").write(subscription).write("\"} "); - stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n'); - } - - private static void printManageLedgerStats(SimpleTextOutputStream stream, String cluster, String namespace, + private static void printManageLedgerStats(PrometheusMetricStreams stream, String cluster, String namespace, String topic, String subscription, ManagedLedgerStats stats) { - metrics(stream, cluster, namespace, topic, subscription, - "pulsar_storage_size", stats.storageSize); - metrics(stream, cluster, namespace, topic, subscription, - "pulsar_storage_logical_size", stats.storageLogicalSize); - metrics(stream, cluster, namespace, topic, subscription, - "pulsar_storage_backlog_size", stats.backlogSize); - metrics(stream, cluster, namespace, topic, subscription, - "pulsar_storage_offloaded_size", stats.offloadedStorageUsed); + writeMetric(stream, "pulsar_storage_size", stats.storageSize, cluster, namespace, topic, subscription); + writeMetric(stream, "pulsar_storage_logical_size", stats.storageLogicalSize, cluster, namespace, topic, + subscription); + writeMetric(stream, "pulsar_storage_backlog_size", stats.backlogSize, cluster, namespace, topic, + subscription); + writeMetric(stream, "pulsar_storage_offloaded_size", stats.offloadedStorageUsed, cluster, namespace, topic, + subscription); - metrics(stream, cluster, namespace, topic, subscription, - "pulsar_storage_write_rate", stats.storageWriteRate); - metrics(stream, cluster, namespace, topic, subscription, - "pulsar_storage_read_rate", stats.storageReadRate); + writeMetric(stream, "pulsar_storage_write_rate", stats.storageWriteRate, cluster, namespace, topic, + subscription); + writeMetric(stream, "pulsar_storage_read_rate", stats.storageReadRate, cluster, namespace, topic, + subscription); stats.storageWriteLatencyBuckets.refresh(); long[] latencyBuckets = stats.storageWriteLatencyBuckets.getBuckets(); - metric(stream, cluster, namespace, topic, subscription, - "pulsar_storage_write_latency_le_0_5", latencyBuckets[0]); - metric(stream, cluster, namespace, topic, subscription, - "pulsar_storage_write_latency_le_1", latencyBuckets[1]); - metric(stream, cluster, namespace, topic, subscription, - "pulsar_storage_write_latency_le_5", latencyBuckets[2]); - metric(stream, cluster, namespace, topic, subscription, - "pulsar_storage_write_latency_le_10", latencyBuckets[3]); - metric(stream, cluster, namespace, topic, subscription, - "pulsar_storage_write_latency_le_20", latencyBuckets[4]); - metric(stream, cluster, namespace, topic, subscription, - "pulsar_storage_write_latency_le_50", latencyBuckets[5]); - metric(stream, cluster, namespace, topic, subscription, - "pulsar_storage_write_latency_le_100", latencyBuckets[6]); - metric(stream, cluster, namespace, topic, subscription, - "pulsar_storage_write_latency_le_200", latencyBuckets[7]); - metric(stream, cluster, namespace, topic, subscription, - "pulsar_storage_write_latency_le_1000", latencyBuckets[8]); - metric(stream, cluster, namespace, topic, subscription, - "pulsar_storage_write_latency_overflow", latencyBuckets[9]); - metric(stream, cluster, namespace, topic, subscription, "pulsar_storage_write_latency_count", - stats.storageWriteLatencyBuckets.getCount()); - metric(stream, cluster, namespace, topic, subscription, "pulsar_storage_write_latency_sum", - stats.storageWriteLatencyBuckets.getSum()); + writeMetric(stream, "pulsar_storage_write_latency_le_0_5", latencyBuckets[0], cluster, namespace, topic, + subscription); + writeMetric(stream, "pulsar_storage_write_latency_le_1", latencyBuckets[1], cluster, namespace, topic, + subscription); + writeMetric(stream, "pulsar_storage_write_latency_le_5", latencyBuckets[2], cluster, namespace, topic, + subscription); + writeMetric(stream, "pulsar_storage_write_latency_le_10", latencyBuckets[3], cluster, namespace, topic, + subscription); + writeMetric(stream, "pulsar_storage_write_latency_le_20", latencyBuckets[4], cluster, namespace, topic, + subscription); + writeMetric(stream, "pulsar_storage_write_latency_le_50", latencyBuckets[5], cluster, namespace, topic, + subscription); + writeMetric(stream, "pulsar_storage_write_latency_le_100", latencyBuckets[6], cluster, namespace, topic, + subscription); + writeMetric(stream, "pulsar_storage_write_latency_le_200", latencyBuckets[7], cluster, namespace, topic, + subscription); + writeMetric(stream, "pulsar_storage_write_latency_le_1000", latencyBuckets[8], cluster, namespace, topic, + subscription); + writeMetric(stream, "pulsar_storage_write_latency_overflow", latencyBuckets[9], cluster, namespace, topic, + subscription); + writeMetric(stream, "pulsar_storage_write_latency_count", stats.storageWriteLatencyBuckets.getCount(), + cluster, namespace, topic, subscription); + writeMetric(stream, "pulsar_storage_write_latency_sum", stats.storageWriteLatencyBuckets.getSum(), cluster, + namespace, topic, subscription); stats.storageLedgerWriteLatencyBuckets.refresh(); - long[] ledgerWritelatencyBuckets = stats.storageLedgerWriteLatencyBuckets.getBuckets(); - metric(stream, cluster, namespace, topic, subscription, - "pulsar_storage_ledger_write_latency_le_0_5", ledgerWritelatencyBuckets[0]); - metric(stream, cluster, namespace, topic, subscription, - "pulsar_storage_ledger_write_latency_le_1", ledgerWritelatencyBuckets[1]); - metric(stream, cluster, namespace, topic, subscription, - "pulsar_storage_ledger_write_latency_le_5", ledgerWritelatencyBuckets[2]); - metric(stream, cluster, namespace, topic, subscription, - "pulsar_storage_ledger_write_latency_le_10", ledgerWritelatencyBuckets[3]); - metric(stream, cluster, namespace, topic, subscription, - "pulsar_storage_ledger_write_latency_le_20", ledgerWritelatencyBuckets[4]); - metric(stream, cluster, namespace, topic, subscription, - "pulsar_storage_ledger_write_latency_le_50", ledgerWritelatencyBuckets[5]); - metric(stream, cluster, namespace, topic, subscription, - "pulsar_storage_ledger_write_latency_le_100", ledgerWritelatencyBuckets[6]); - metric(stream, cluster, namespace, topic, subscription, - "pulsar_storage_ledger_write_latency_le_200", ledgerWritelatencyBuckets[7]); - metric(stream, cluster, namespace, topic, subscription, - "pulsar_storage_ledger_write_latency_le_1000", ledgerWritelatencyBuckets[8]); - metric(stream, cluster, namespace, topic, subscription, "pulsar_storage_ledger_write_latency_overflow", - ledgerWritelatencyBuckets[9]); - metric(stream, cluster, namespace, topic, subscription, "pulsar_storage_ledger_write_latency_count", - stats.storageLedgerWriteLatencyBuckets.getCount()); - metric(stream, cluster, namespace, topic, subscription, "pulsar_storage_ledger_write_latency_sum", - stats.storageLedgerWriteLatencyBuckets.getSum()); + long[] ledgerWriteLatencyBuckets = stats.storageLedgerWriteLatencyBuckets.getBuckets(); + writeMetric(stream, "pulsar_storage_ledger_write_latency_le_0_5", ledgerWriteLatencyBuckets[0], cluster, + namespace, topic, subscription); + writeMetric(stream, "pulsar_storage_ledger_write_latency_le_1", ledgerWriteLatencyBuckets[1], cluster, + namespace, topic, subscription); + writeMetric(stream, "pulsar_storage_ledger_write_latency_le_5", ledgerWriteLatencyBuckets[2], cluster, + namespace, topic, subscription); + writeMetric(stream, "pulsar_storage_ledger_write_latency_le_10", ledgerWriteLatencyBuckets[3], cluster, + namespace, topic, subscription); + writeMetric(stream, "pulsar_storage_ledger_write_latency_le_20", ledgerWriteLatencyBuckets[4], cluster, + namespace, topic, subscription); + writeMetric(stream, "pulsar_storage_ledger_write_latency_le_50", ledgerWriteLatencyBuckets[5], cluster, + namespace, topic, subscription); + writeMetric(stream, "pulsar_storage_ledger_write_latency_le_100", ledgerWriteLatencyBuckets[6], cluster, + namespace, topic, subscription); + writeMetric(stream, "pulsar_storage_ledger_write_latency_le_200", ledgerWriteLatencyBuckets[7], cluster, + namespace, topic, subscription); + writeMetric(stream, "pulsar_storage_ledger_write_latency_le_1000", ledgerWriteLatencyBuckets[8], cluster, + namespace, topic, subscription); + writeMetric(stream, "pulsar_storage_ledger_write_latency_overflow", ledgerWriteLatencyBuckets[9], cluster, + namespace, topic, subscription); + writeMetric(stream, "pulsar_storage_ledger_write_latency_count", + stats.storageLedgerWriteLatencyBuckets.getCount(), cluster, namespace, topic, subscription); + writeMetric(stream, "pulsar_storage_ledger_write_latency_sum", + stats.storageLedgerWriteLatencyBuckets.getSum(), cluster, namespace, topic, subscription); stats.entrySizeBuckets.refresh(); long[] entrySizeBuckets = stats.entrySizeBuckets.getBuckets(); - metric(stream, cluster, namespace, topic, subscription, "pulsar_entry_size_le_128", entrySizeBuckets[0]); - metric(stream, cluster, namespace, topic, subscription, "pulsar_entry_size_le_512", entrySizeBuckets[1]); - metric(stream, cluster, namespace, topic, subscription, "pulsar_entry_size_le_1_kb", entrySizeBuckets[2]); - metric(stream, cluster, namespace, topic, subscription, "pulsar_entry_size_le_2_kb", entrySizeBuckets[3]); - metric(stream, cluster, namespace, topic, subscription, "pulsar_entry_size_le_4_kb", entrySizeBuckets[4]); - metric(stream, cluster, namespace, topic, subscription, "pulsar_entry_size_le_16_kb", entrySizeBuckets[5]); - metric(stream, cluster, namespace, topic, subscription, - "pulsar_entry_size_le_100_kb", entrySizeBuckets[6]); - metric(stream, cluster, namespace, topic, subscription, "pulsar_entry_size_le_1_mb", entrySizeBuckets[7]); - metric(stream, cluster, namespace, topic, subscription, - "pulsar_entry_size_le_overflow", entrySizeBuckets[8]); - metric(stream, cluster, namespace, topic, subscription, - "pulsar_entry_size_count", stats.entrySizeBuckets.getCount()); - metric(stream, cluster, namespace, topic, subscription, - "pulsar_entry_size_sum", stats.entrySizeBuckets.getSum()); - } - - private static void metric(SimpleTextOutputStream stream, String cluster, - String namespace, String topic, String subscription, - String name, long value) { - stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace) - .write("\",topic=\"").write(topic).write("\",subscription=\"").write(subscription).write("\"} "); - stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n'); + writeMetric(stream, "pulsar_entry_size_le_128", entrySizeBuckets[0], cluster, namespace, topic, + subscription); + writeMetric(stream, "pulsar_entry_size_le_512", entrySizeBuckets[1], cluster, namespace, topic, + subscription); + writeMetric(stream, "pulsar_entry_size_le_1_kb", entrySizeBuckets[2], cluster, namespace, topic, + subscription); + writeMetric(stream, "pulsar_entry_size_le_2_kb", entrySizeBuckets[3], cluster, namespace, topic, + subscription); + writeMetric(stream, "pulsar_entry_size_le_4_kb", entrySizeBuckets[4], cluster, namespace, topic, + subscription); + writeMetric(stream, "pulsar_entry_size_le_16_kb", entrySizeBuckets[5], cluster, namespace, topic, + subscription); + writeMetric(stream, "pulsar_entry_size_le_100_kb", entrySizeBuckets[6], cluster, namespace, topic, + subscription); + writeMetric(stream, "pulsar_entry_size_le_1_mb", entrySizeBuckets[7], cluster, namespace, topic, + subscription); + writeMetric(stream, "pulsar_entry_size_le_overflow", entrySizeBuckets[8], cluster, namespace, topic, + subscription); + writeMetric(stream, "pulsar_entry_size_count", stats.entrySizeBuckets.getCount(), cluster, namespace, + topic, subscription); + writeMetric(stream, "pulsar_entry_size_sum", stats.entrySizeBuckets.getSum(), cluster, namespace, topic, + subscription); } - static void printTransactionCoordinatorStats(SimpleTextOutputStream stream, String cluster, + static void printTransactionCoordinatorStats(PrometheusMetricStreams stream, String cluster, AggregatedTransactionCoordinatorStats stats, long coordinatorId) { - metric(stream, cluster, "pulsar_txn_active_count", - stats.actives, coordinatorId); - metric(stream, cluster, "pulsar_txn_committed_count", - stats.committedCount, coordinatorId); - metric(stream, cluster, "pulsar_txn_aborted_count", - stats.abortedCount, coordinatorId); - metric(stream, cluster, "pulsar_txn_created_count", - stats.createdCount, coordinatorId); - metric(stream, cluster, "pulsar_txn_timeout_count", - stats.timeoutCount, coordinatorId); - metric(stream, cluster, "pulsar_txn_append_log_count", - stats.appendLogCount, coordinatorId); + writeMetric(stream, "pulsar_txn_active_count", stats.actives, cluster, + coordinatorId); + writeMetric(stream, "pulsar_txn_committed_count", stats.committedCount, cluster, + coordinatorId); + writeMetric(stream, "pulsar_txn_aborted_count", stats.abortedCount, cluster, + coordinatorId); + writeMetric(stream, "pulsar_txn_created_count", stats.createdCount, cluster, + coordinatorId); + writeMetric(stream, "pulsar_txn_timeout_count", stats.timeoutCount, cluster, + coordinatorId); + writeMetric(stream, "pulsar_txn_append_log_count", stats.appendLogCount, cluster, + coordinatorId); long[] latencyBuckets = stats.executionLatency; - metric(stream, cluster, "pulsar_txn_execution_latency_le_10", latencyBuckets[0], coordinatorId); - metric(stream, cluster, "pulsar_txn_execution_latency_le_20", latencyBuckets[1], coordinatorId); - metric(stream, cluster, "pulsar_txn_execution_latency_le_50", latencyBuckets[2], coordinatorId); - metric(stream, cluster, "pulsar_txn_execution_latency_le_100", latencyBuckets[3], coordinatorId); - metric(stream, cluster, "pulsar_txn_execution_latency_le_500", latencyBuckets[4], coordinatorId); - metric(stream, cluster, "pulsar_txn_execution_latency_le_1000", latencyBuckets[5], coordinatorId); - metric(stream, cluster, "pulsar_txn_execution_latency_le_5000", latencyBuckets[6], coordinatorId); - metric(stream, cluster, "pulsar_txn_execution_latency_le_15000", latencyBuckets[7], coordinatorId); - metric(stream, cluster, "pulsar_txn_execution_latency_le_30000", latencyBuckets[8], coordinatorId); - metric(stream, cluster, "pulsar_txn_execution_latency_le_60000", latencyBuckets[9], coordinatorId); - metric(stream, cluster, "pulsar_txn_execution_latency_le_300000", - latencyBuckets[10], coordinatorId); - metric(stream, cluster, "pulsar_txn_execution_latency_le_1500000", - latencyBuckets[11], coordinatorId); - metric(stream, cluster, "pulsar_txn_execution_latency_le_3000000", - latencyBuckets[12], coordinatorId); - metric(stream, cluster, "pulsar_txn_execution_latency_le_overflow", - latencyBuckets[13], coordinatorId); + writeMetric(stream, "pulsar_txn_execution_latency_le_10", latencyBuckets[0], cluster, coordinatorId); + writeMetric(stream, "pulsar_txn_execution_latency_le_20", latencyBuckets[1], cluster, coordinatorId); + writeMetric(stream, "pulsar_txn_execution_latency_le_50", latencyBuckets[2], cluster, coordinatorId); + writeMetric(stream, "pulsar_txn_execution_latency_le_100", latencyBuckets[3], cluster, coordinatorId); + writeMetric(stream, "pulsar_txn_execution_latency_le_500", latencyBuckets[4], cluster, coordinatorId); + writeMetric(stream, "pulsar_txn_execution_latency_le_1000", latencyBuckets[5], cluster, coordinatorId); + writeMetric(stream, "pulsar_txn_execution_latency_le_5000", latencyBuckets[6], cluster, coordinatorId); + writeMetric(stream, "pulsar_txn_execution_latency_le_15000", latencyBuckets[7], cluster, coordinatorId); + writeMetric(stream, "pulsar_txn_execution_latency_le_30000", latencyBuckets[8], cluster, coordinatorId); + writeMetric(stream, "pulsar_txn_execution_latency_le_60000", latencyBuckets[9], cluster, coordinatorId); + writeMetric(stream, "pulsar_txn_execution_latency_le_300000", latencyBuckets[10], cluster, + coordinatorId); + writeMetric(stream, "pulsar_txn_execution_latency_le_1500000", latencyBuckets[11], cluster, + coordinatorId); + writeMetric(stream, "pulsar_txn_execution_latency_le_3000000", latencyBuckets[12], cluster, + coordinatorId); + writeMetric(stream, "pulsar_txn_execution_latency_le_overflow", latencyBuckets[13], cluster, + coordinatorId); + } + + private static void writeMetric(PrometheusMetricStreams stream, String metricName, double value, String cluster, + long coordinatorId) { + stream.writeSample(metricName, value, "cluster", cluster, "coordinator_id", String.valueOf(coordinatorId)); + } + + private static void writeMetric(PrometheusMetricStreams stream, String metricName, Number value, String cluster, + String namespace, String topic, String subscription) { + stream.writeSample(metricName, value, "cluster", cluster, "namespace", namespace, "topic", topic, + "subscription", subscription); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusTextFormatUtil.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusTextFormatUtil.java index 7550096c2b584..8f704b11e764c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusTextFormatUtil.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusTextFormatUtil.java @@ -18,13 +18,8 @@ */ package org.apache.pulsar.broker.stats.prometheus.metrics; -import io.prometheus.client.Collector; -import io.prometheus.client.Collector.MetricFamilySamples; -import io.prometheus.client.Collector.MetricFamilySamples.Sample; -import io.prometheus.client.CollectorRegistry; import java.io.IOException; import java.io.Writer; -import java.util.Enumeration; import org.apache.bookkeeper.stats.Counter; /** @@ -140,31 +135,4 @@ private static void writeSum(Writer w, DataSketchesOpStatsLogger opStat, String .append(success.toString()).append("\"} ") .append(Double.toString(opStat.getSum(success))).append('\n'); } - - public static void writeMetricsCollectedByPrometheusClient(Writer w, CollectorRegistry registry) - throws IOException { - Enumeration metricFamilySamples = registry.metricFamilySamples(); - while (metricFamilySamples.hasMoreElements()) { - MetricFamilySamples metricFamily = metricFamilySamples.nextElement(); - - for (int i = 0; i < metricFamily.samples.size(); i++) { - Sample sample = metricFamily.samples.get(i); - w.write(sample.name); - w.write('{'); - for (int j = 0; j < sample.labelNames.size(); j++) { - if (j != 0) { - w.write(", "); - } - w.write(sample.labelNames.get(j)); - w.write("=\""); - w.write(sample.labelValues.get(j)); - w.write('"'); - } - - w.write("} "); - w.write(Collector.doubleToGoString(sample.value)); - w.write('\n'); - } - } - } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java index f28412ea75160..be10c8dc65e94 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java @@ -50,6 +50,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Matcher; import java.util.regex.Pattern; import javax.crypto.SecretKey; @@ -1404,6 +1405,63 @@ private void compareCompactionStateCount(List cm, double count) { assertEquals(cm.get(0).value, count); } + @Test + public void testMetricsGroupedByTypeDefinitions() throws Exception { + Producer p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create(); + Producer p2 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2").create(); + for (int i = 0; i < 10; i++) { + String message = "my-message-" + i; + p1.send(message.getBytes()); + p2.send(message.getBytes()); + } + + ByteArrayOutputStream statsOut = new ByteArrayOutputStream(); + PrometheusMetricsGenerator.generate(pulsar, false, false, false, statsOut); + String metricsStr = statsOut.toString(); + + Pattern typePattern = Pattern.compile("^#\\s+TYPE\\s+(\\w+)\\s+(\\w+)"); + Pattern metricNamePattern = Pattern.compile("^(\\w+)\\{.+"); + + AtomicReference currentMetric = new AtomicReference<>(); + Splitter.on("\n").split(metricsStr).forEach(line -> { + if (line.isEmpty()) { + return; + } + if (line.startsWith("#")) { + // Get the current type definition + Matcher typeMatcher = typePattern.matcher(line); + checkArgument(typeMatcher.matches()); + String metricName = typeMatcher.group(1); + currentMetric.set(metricName); + } else { + Matcher metricMatcher = metricNamePattern.matcher(line); + checkArgument(metricMatcher.matches()); + String metricName = metricMatcher.group(1); + + if (metricName.endsWith("_bucket")) { + metricName = metricName.substring(0, metricName.indexOf("_bucket")); + } else if (metricName.endsWith("_count") && !currentMetric.get().endsWith("_count")) { + metricName = metricName.substring(0, metricName.indexOf("_count")); + } else if (metricName.endsWith("_sum") && !currentMetric.get().endsWith("_sum")) { + metricName = metricName.substring(0, metricName.indexOf("_sum")); + } else if (metricName.endsWith("_total") && !currentMetric.get().endsWith("_total")) { + metricName = metricName.substring(0, metricName.indexOf("_total")); + } else if (metricName.endsWith("_created") && !currentMetric.get().endsWith("_created")) { + metricName = metricName.substring(0, metricName.indexOf("_created")); + } + + if (!metricName.equals(currentMetric.get())) { + System.out.println(metricsStr); + fail("Metric not grouped under its type definition: " + line); + } + + } + }); + + p1.close(); + p2.close(); + } + /** * Hacky parsing of Prometheus text format. Should be good enough for unit tests */ diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricStreamsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricStreamsTest.java new file mode 100644 index 0000000000000..15c29a0dc66bb --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricStreamsTest.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.stats.prometheus; + +import static org.testng.Assert.assertTrue; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import java.io.ByteArrayOutputStream; +import java.nio.charset.StandardCharsets; +import org.apache.pulsar.common.util.SimpleTextOutputStream; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +@Test(groups = "broker") +public class PrometheusMetricStreamsTest { + + private PrometheusMetricStreams underTest; + + @BeforeMethod(alwaysRun = true) + protected void setup() throws Exception { + underTest = new PrometheusMetricStreams(); + } + + @AfterMethod(alwaysRun = true) + protected void cleanup() throws Exception { + underTest.releaseAll(); + } + + @Test + public void canWriteSampleWithoutLabels() { + underTest.writeSample("my-metric", 123); + + String actual = writeToString(); + + assertTrue(actual.startsWith("# TYPE my-metric gauge"), "Gauge type line missing"); + assertTrue(actual.contains("my-metric{} 123"), "Metric line missing"); + } + + @Test + public void canWriteSampleWithLabels() { + underTest.writeSample("my-other-metric", 123, "cluster", "local"); + underTest.writeSample("my-other-metric", 456, "cluster", "local", "namespace", "my-ns"); + + String actual = writeToString(); + + assertTrue(actual.startsWith("# TYPE my-other-metric gauge"), "Gauge type line missing"); + assertTrue(actual.contains("my-other-metric{cluster=\"local\"} 123"), "Cluster metric line missing"); + assertTrue(actual.contains("my-other-metric{cluster=\"local\",namespace=\"my-ns\"} 456"), + "Cluster and Namespace metric line missing"); + } + + private String writeToString() { + ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer(); + try { + SimpleTextOutputStream stream = new SimpleTextOutputStream(buffer); + underTest.flushAllToStream(stream); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + int readIndex = buffer.readerIndex(); + int readableBytes = buffer.readableBytes(); + for (int i = 0; i < readableBytes; i++) { + out.write(buffer.getByte(readIndex + i)); + } + return out.toString(); + } finally { + buffer.release(); + } + } +} \ No newline at end of file diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/SimpleTextOutputStream.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SimpleTextOutputStream.java index 9fc4b347c854f..dd78b4cfe58b9 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/SimpleTextOutputStream.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SimpleTextOutputStream.java @@ -22,12 +22,11 @@ /** * Format strings and numbers into a ByteBuf without any memory allocation. - * */ public class SimpleTextOutputStream { private final ByteBuf buffer; - private static final char[] hexChars = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', - 'f' }; + private static final char[] hexChars = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', + 'f'}; public SimpleTextOutputStream(ByteBuf buffer) { this.buffer = buffer; @@ -131,4 +130,12 @@ public SimpleTextOutputStream write(double d) { write(r); return this; } + + public void write(ByteBuf byteBuf) { + buffer.writeBytes(byteBuf); + } + + public ByteBuf getBuffer() { + return buffer; + } } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/PrometheusTextFormat.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/PrometheusTextFormat.java index f7a205c7db02c..f5aa273f656ac 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/PrometheusTextFormat.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/PrometheusTextFormat.java @@ -37,6 +37,11 @@ public static void write004(Writer writer, Enumeration 0) { diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerStatsManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerStatsManager.java index c8b411cbf5706..2ad407b2e5e3e 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerStatsManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerStatsManager.java @@ -328,6 +328,11 @@ private void generateLeaderMetrics(StringWriter stream) { } private void writeMetric(String metricName, long value, StringWriter stream) { + stream.write("# TYPE "); + stream.write(PULSAR_FUNCTION_WORKER_METRICS_PREFIX); + stream.write(metricName); + stream.write(" gauge \n"); + stream.write(PULSAR_FUNCTION_WORKER_METRICS_PREFIX); stream.write(metricName); stream.write("{");