io.opencensus
opencensus-contrib-grpc-metrics
diff --git a/pulsar-broker-shaded/pom.xml b/pulsar-broker-shaded/pom.xml
index 035e2e14f746b..47ce4da802d98 100644
--- a/pulsar-broker-shaded/pom.xml
+++ b/pulsar-broker-shaded/pom.xml
@@ -82,6 +82,7 @@
net.java.dev.jna:*
com.carrotsearch:*
io.prometheus:*
+ io.perfmark:*
com.github.ben-manes.caffeine:*
org.glassfish.jersey.*:*
org.rocksdb:*
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/DataSketchesOpStatsLogger.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/DataSketchesOpStatsLogger.java
index 3ef453ddc654e..2dcfb10247100 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/DataSketchesOpStatsLogger.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/DataSketchesOpStatsLogger.java
@@ -55,7 +55,7 @@ public class DataSketchesOpStatsLogger implements OpStatsLogger {
private final LongAdder successSumAdder = new LongAdder();
private final LongAdder failSumAdder = new LongAdder();
- DataSketchesOpStatsLogger() {
+ public DataSketchesOpStatsLogger() {
this.current = new ThreadLocalAccessor();
this.replacement = new ThreadLocalAccessor();
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/DataSketchesSummaryLogger.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/DataSketchesSummaryLogger.java
index 96e7bd2fec59f..0ef11d8e37483 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/DataSketchesSummaryLogger.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/DataSketchesSummaryLogger.java
@@ -46,7 +46,7 @@ public class DataSketchesSummaryLogger {
private final LongAdder countAdder = new LongAdder();
private final LongAdder sumAdder = new LongAdder();
- DataSketchesSummaryLogger() {
+ public DataSketchesSummaryLogger() {
this.current = new ThreadLocalAccessor();
this.replacement = new ThreadLocalAccessor();
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/LongAdderCounter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/LongAdderCounter.java
new file mode 100644
index 0000000000000..39be12b35509b
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/LongAdderCounter.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats.prometheus.metrics;
+
+import java.util.concurrent.atomic.LongAdder;
+import org.apache.bookkeeper.stats.Counter;
+
+/**
+ * {@link Counter} implementation based on {@link LongAdder}.
+ *
+ * LongAdder keeps a counter per-thread and then aggregates to get the result, in order to avoid contention between
+ * multiple threads.
+ */
+public class LongAdderCounter implements Counter {
+ private final LongAdder counter = new LongAdder();
+
+ public LongAdderCounter() {
+
+ }
+
+ @Override
+ public void clear() {
+ counter.reset();
+ }
+
+ @Override
+ public void inc() {
+ counter.increment();
+ }
+
+ @Override
+ public void dec() {
+ counter.decrement();
+ }
+
+ @Override
+ public void add(long delta) {
+ counter.add(delta);
+ }
+
+ @Override
+ public Long get() {
+ return counter.sum();
+ }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/ObserverGauge.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/ObserverGauge.java
index 8cc7ff3c04963..93f39c802ce9a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/ObserverGauge.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/ObserverGauge.java
@@ -33,6 +33,8 @@ public class ObserverGauge extends SimpleCollector implemen
public static class Builder extends SimpleCollector.Builder {
private Supplier supplier;
+ public Builder() {}
+
public Builder supplier(Supplier supplier) {
this.supplier = supplier;
return this;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusMetricsProvider.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusMetricsProvider.java
index e05c6c43326cc..1007f41c47dad 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusMetricsProvider.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusMetricsProvider.java
@@ -24,7 +24,6 @@
import org.apache.bookkeeper.stats.CachingStatsProvider;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.stats.StatsProvider;
-import org.apache.bookkeeper.stats.prometheus.LongAdderCounter;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
@@ -53,9 +52,9 @@ public class PrometheusMetricsProvider implements StatsProvider {
/**
* These acts a registry of the metrics defined in this provider
*/
- final ConcurrentMap counters = new ConcurrentSkipListMap<>();
- final ConcurrentMap> gauges = new ConcurrentSkipListMap<>();
- final ConcurrentMap opStats = new ConcurrentSkipListMap<>();
+ public final ConcurrentMap counters = new ConcurrentSkipListMap<>();
+ public final ConcurrentMap> gauges = new ConcurrentSkipListMap<>();
+ public final ConcurrentMap opStats = new ConcurrentSkipListMap<>();
public PrometheusMetricsProvider() {
this.cachingStatsProvider = new CachingStatsProvider(new StatsProvider() {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusStatsLogger.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusStatsLogger.java
index ad3c62f78d731..43efcc9b0b812 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusStatsLogger.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusStatsLogger.java
@@ -24,7 +24,6 @@
import org.apache.bookkeeper.stats.Gauge;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.stats.prometheus.LongAdderCounter;
/**
* A {@code Prometheus} based {@link StatsLogger} implementation.
@@ -34,7 +33,7 @@ public class PrometheusStatsLogger implements StatsLogger {
private final PrometheusMetricsProvider provider;
private final String scope;
- PrometheusStatsLogger(PrometheusMetricsProvider provider, String scope) {
+ public PrometheusStatsLogger(PrometheusMetricsProvider provider, String scope) {
this.provider = provider;
this.scope = scope;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusTextFormatUtil.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusTextFormatUtil.java
index abe0b560aa746..0f9a4c86924fb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusTextFormatUtil.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusTextFormatUtil.java
@@ -128,7 +128,8 @@ private static void writeSum(Writer w, DataSketchesOpStatsLogger opStat, String
.append(Double.toString(opStat.getSum(success))).append('\n');
}
- static void writeMetricsCollectedByPrometheusClient(Writer w, CollectorRegistry registry) throws IOException {
+ public static void writeMetricsCollectedByPrometheusClient(Writer w, CollectorRegistry registry)
+ throws IOException {
Enumeration metricFamilySamples = registry.metricFamilySamples();
while (metricFamilySamples.hasMoreElements()) {
MetricFamilySamples metricFamily = metricFamilySamples.nextElement();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/SimpleGauge.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/SimpleGauge.java
index a93a26c1bd1e6..612168a2d9143 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/SimpleGauge.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/SimpleGauge.java
@@ -31,7 +31,7 @@ public SimpleGauge(final Gauge gauge) {
this.gauge = gauge;
}
- Number getSample() {
+ public Number getSample() {
return gauge.getSample();
}
}
diff --git a/pulsar-client-admin-shaded/pom.xml b/pulsar-client-admin-shaded/pom.xml
index de67904b64495..1102587b0a106 100644
--- a/pulsar-client-admin-shaded/pom.xml
+++ b/pulsar-client-admin-shaded/pom.xml
@@ -87,6 +87,7 @@
org.glassfish.hk2*:*
com.fasterxml.jackson.*:*
io.grpc:*
+ io.perfmark:*
com.yahoo.datasketches:*
com.squareup.*:*
com.google.*:*
diff --git a/pulsar-client-all/pom.xml b/pulsar-client-all/pom.xml
index 490360e2dcbe7..a71e66ed000c3 100644
--- a/pulsar-client-all/pom.xml
+++ b/pulsar-client-all/pom.xml
@@ -134,6 +134,7 @@
org.glassfish.hk2*:*
com.fasterxml.jackson.*:*
io.grpc:*
+ io.perfmark:*
com.yahoo.datasketches:*
io.netty:*
com.squareup.*:*
diff --git a/pulsar-client-shaded/pom.xml b/pulsar-client-shaded/pom.xml
index f8d3e1e8d4395..206f574767c32 100644
--- a/pulsar-client-shaded/pom.xml
+++ b/pulsar-client-shaded/pom.xml
@@ -123,6 +123,7 @@
com.fasterxml.jackson.core:jackson-core
com.fasterxml.jackson.dataformat
io.netty:*
+ io.perfmark:*
org.eclipse.jetty:*
com.yahoo.datasketches:*
commons-*:*
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/MarkersTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/MarkersTest.java
index 620cb35e56122..c1e857487bf13 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/MarkersTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/MarkersTest.java
@@ -19,18 +19,13 @@
package org.apache.pulsar.common.protocol;
import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
-
import io.netty.buffer.ByteBuf;
-
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
-
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.apache.pulsar.common.api.proto.PulsarMarkers;
@@ -139,14 +134,6 @@ public void testTxnCommitMarker() throws IOException {
}
ByteBuf buf = Markers.newTxnCommitMarker(sequenceId, mostBits, leastBits, messageIdDataList);
- for (MessageIdData messageIdData : messageIdDataList) {
- try {
- messageIdData.recycle();
- fail("message id data should be recycled after create the marker bytebuf.");
- } catch (Exception e) {
- assertTrue(e instanceof java.lang.IllegalStateException);
- }
- }
MessageMetadata msgMetadata = Commands.parseMessageMetadata(buf);
assertEquals(msgMetadata.getMarkerType(), PulsarMarkers.MarkerType.TXN_COMMIT_VALUE);
diff --git a/pulsar-functions/instance/pom.xml b/pulsar-functions/instance/pom.xml
index 7052a7402b4fa..e541a540d9ce7 100644
--- a/pulsar-functions/instance/pom.xml
+++ b/pulsar-functions/instance/pom.xml
@@ -91,6 +91,10 @@
io.grpc
grpc-all
+
+ io.grpc
+ grpc-protobuf-lite
+
com.google.protobuf
protobuf-java
@@ -109,6 +113,12 @@
grpc-all
+