diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 2f0bf0cb9da90..44048b78cf305 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -32,6 +32,7 @@ import java.lang.reflect.Method; import java.net.InetSocketAddress; import java.util.Collections; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Objects; @@ -88,6 +89,7 @@ import org.apache.pulsar.broker.service.schema.SchemaRegistryService; import org.apache.pulsar.broker.stats.MetricsGenerator; import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet; +import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider; import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider; import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl; import org.apache.pulsar.broker.validator.MultipleListenerValidator; @@ -204,6 +206,8 @@ public class PulsarService implements AutoCloseable { // packages management service private PackagesManagement packagesManagement; + private PrometheusMetricsServlet metricsServlet; + private List pendingMetricsProviders; public enum State { @@ -513,11 +517,16 @@ public Boolean get() { "org.apache.pulsar.broker.admin.v3", true, attributeMap); this.webService.addRestResources("/lookup", "org.apache.pulsar.broker.lookup", true, attributeMap); + this.metricsServlet = new PrometheusMetricsServlet( + this, config.isExposeTopicLevelMetricsInPrometheus(), + config.isExposeConsumerLevelMetricsInPrometheus()); + if (pendingMetricsProviders != null) { + pendingMetricsProviders.forEach(provider -> metricsServlet.addRawMetricsProvider(provider)); + this.pendingMetricsProviders = null; + } this.webService.addServlet("/metrics", - new ServletHolder(new PrometheusMetricsServlet( - this, config.isExposeTopicLevelMetricsInPrometheus(), - config.isExposeConsumerLevelMetricsInPrometheus())), + new ServletHolder(metricsServlet), false, attributeMap); if (config.isWebSocketServiceEnabled()) { @@ -1235,6 +1244,17 @@ public TopicPoliciesService getTopicPoliciesService() { return topicPoliciesService; } + public void addPrometheusRawMetricsProvider(PrometheusRawMetricsProvider metricsProvider) { + if (metricsServlet == null) { + if (pendingMetricsProviders == null) { + pendingMetricsProviders = new LinkedList<>(); + } + pendingMetricsProviders.add(metricsProvider); + } else { + this.metricsServlet.addRawMetricsProvider(metricsProvider); + } + } + private void startWorkerService(AuthenticationService authenticationService, AuthorizationService authorizationService) throws Exception { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java index eb172cd0b558c..62f695372e346 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java @@ -56,7 +56,6 @@ public static void generate(PulsarService pulsar, boolean includeTopicMetrics, b printDefaultBrokerStats(stream, cluster); LongAdder topicsCount = new LongAdder(); - pulsar.getBrokerService().getMultiLayerTopicMap().forEach((namespace, bundlesMap) -> { namespaceStats.reset(); topicsCount.reset(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java index a1d4235992ca6..d0acdd396cd07 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java @@ -35,6 +35,7 @@ import java.util.Collection; import java.util.Enumeration; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import org.apache.bookkeeper.stats.NullStatsProvider; @@ -83,7 +84,12 @@ public double get() { } public static void generate(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics, - OutputStream out) throws IOException { + OutputStream out) throws IOException { + generate(pulsar, includeTopicMetrics, includeConsumerMetrics, out, null); + } + + public static void generate(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics, + OutputStream out, List metricsProviders) throws IOException { ByteBuf buf = ByteBufAllocator.DEFAULT.heapBuffer(); try { SimpleTextOutputStream stream = new SimpleTextOutputStream(buf); @@ -100,6 +106,11 @@ public static void generate(PulsarService pulsar, boolean includeTopicMetrics, b generateManagedLedgerBookieClientMetrics(pulsar, stream); + if (metricsProviders != null) { + for (PrometheusRawMetricsProvider metricsProvider : metricsProviders) { + metricsProvider.generate(stream); + } + } out.write(buf.array(), buf.arrayOffset(), buf.readableBytes()); } finally { buf.release(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java index 5ec5215a3144f..7f36fa701d1c0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java @@ -21,6 +21,8 @@ import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun; import io.netty.util.concurrent.DefaultThreadFactory; import java.io.IOException; +import java.util.LinkedList; +import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import javax.servlet.AsyncContext; @@ -40,6 +42,7 @@ public class PrometheusMetricsServlet extends HttpServlet { private final PulsarService pulsar; private final boolean shouldExportTopicMetrics; private final boolean shouldExportConsumerMetrics; + private List metricsProviders; private ExecutorService executor = null; @@ -64,7 +67,7 @@ protected void doGet(HttpServletRequest request, HttpServletResponse response) res.setStatus(HttpStatus.OK_200); res.setContentType("text/plain"); PrometheusMetricsGenerator.generate(pulsar, shouldExportTopicMetrics, shouldExportConsumerMetrics, - res.getOutputStream()); + res.getOutputStream(), metricsProviders); context.complete(); } catch (IOException e) { @@ -82,5 +85,12 @@ public void destroy() { } } + public void addRawMetricsProvider(PrometheusRawMetricsProvider metricsProvider) { + if (metricsProviders == null) { + metricsProviders = new LinkedList<>(); + } + metricsProviders.add(metricsProvider); + } + private static final Logger log = LoggerFactory.getLogger(PrometheusMetricsServlet.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusRawMetricsProvider.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusRawMetricsProvider.java new file mode 100644 index 0000000000000..d2067766a41b7 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusRawMetricsProvider.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.stats.prometheus; + +import org.apache.pulsar.common.util.SimpleTextOutputStream; + +/** + * The prometheus metrics provider for generate prometheus format metrics. + */ +public interface PrometheusRawMetricsProvider { + + /** + * Generate the metrics from the metrics provider. + * @param stream the stream that write the metrics to + */ + void generate(SimpleTextOutputStream stream); +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java index 40fa9f989fd6d..97ddcca0de82c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java @@ -29,7 +29,10 @@ import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; +import java.io.BufferedReader; import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; import java.lang.reflect.Field; import java.util.HashMap; import java.util.HashSet; @@ -51,8 +54,14 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.http.HttpResponse; +import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.DefaultHttpClient; +import org.apache.http.impl.client.HttpClientBuilder; import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException; import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider; import org.apache.pulsar.client.admin.BrokerStats; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.Consumer; @@ -68,6 +77,8 @@ import org.apache.pulsar.common.policies.data.LocalPolicies; import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.common.policies.data.SubscriptionStats; +import org.apache.pulsar.common.util.SimpleTextOutputStream; +import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -962,4 +973,27 @@ public void testStuckTopicUnloading() throws Exception { } assertNull(ledgers.get(topicMlName)); } + + @Test + public void testMetricsProvider() throws IOException { + PrometheusRawMetricsProvider rawMetricsProvider = new PrometheusRawMetricsProvider() { + @Override + public void generate(SimpleTextOutputStream stream) { + stream.write("test_metrics{label1=\"xyz\"} 10 \n"); + } + }; + getPulsar().addPrometheusRawMetricsProvider(rawMetricsProvider); + HttpClient httpClient = HttpClientBuilder.create().build(); + final String metricsEndPoint = getPulsar().getWebServiceAddress() + "/metrics"; + HttpResponse response = httpClient.execute(new HttpGet(metricsEndPoint)); + InputStream inputStream = response.getEntity().getContent(); + InputStreamReader isReader = new InputStreamReader(inputStream); + BufferedReader reader = new BufferedReader(isReader); + StringBuffer sb = new StringBuffer(); + String str; + while((str = reader.readLine()) != null){ + sb.append(str); + } + Assert.assertTrue(sb.toString().contains("test_metrics")); + } }