From 79645fa7b2875d1b19b393fa91c109011cde6c40 Mon Sep 17 00:00:00 2001 From: Jack Vanlightly Date: Thu, 21 Oct 2021 10:21:38 +0200 Subject: [PATCH 1/3] Addition of thread-scoped stats The Counter and OpStatsLogger have new variants that add threadPool and thread labels to their metrics. These new variants can be obtained via new methods in the StatsLogger interface. --- .../common/stats/BroadCastStatsLogger.java | 16 +++ .../bookkeeper/test/TestStatsProvider.java | 10 ++ .../stats/codahale/CodahaleStatsLogger.java | 16 +++ .../prometheus/DataSketchesOpStatsLogger.java | 14 ++- .../stats/prometheus/LongAdderCounter.java | 14 ++- .../prometheus/PrometheusMetricsProvider.java | 4 + .../prometheus/PrometheusStatsLogger.java | 11 ++ .../ThreadScopedDataSketchesStatsLogger.java | 114 ++++++++++++++++++ .../ThreadScopedLongAdderCounter.java | 104 ++++++++++++++++ .../bookkeeper/stats/CachingStatsLogger.java | 16 +++ .../bookkeeper/stats/NullStatsLogger.java | 10 ++ .../apache/bookkeeper/stats/StatsLogger.java | 18 +++ .../bookkeeper/stats/ThreadRegistry.java | 76 ++++++++++++ .../common/stats/BroadCastStatsLogger.java | 16 +++ 14 files changed, 437 insertions(+), 2 deletions(-) create mode 100644 bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/ThreadScopedDataSketchesStatsLogger.java create mode 100644 bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/ThreadScopedLongAdderCounter.java create mode 100644 bookkeeper-stats/src/main/java/org/apache/bookkeeper/stats/ThreadRegistry.java diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/stats/BroadCastStatsLogger.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/stats/BroadCastStatsLogger.java index 1b9fdea6dcf..86020c0fd27 100644 --- a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/stats/BroadCastStatsLogger.java +++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/stats/BroadCastStatsLogger.java @@ -167,6 +167,22 @@ public void removeScope(String scope, StatsLogger statsLogger) { first.removeScope(scope, another.first); second.removeScope(scope, another.second); } + + /** + Thread-scoped stats not currently supported. + */ + @Override + public OpStatsLogger getThreadScopedOpStatsLogger(String name) { + return getOpStatsLogger(name); + } + + /** + Thread-scoped stats not currently supported. + */ + @Override + public Counter getThreadScopedCounter(String name) { + return getCounter(name); + } } /** diff --git a/bookkeeper-common/src/test/java/org/apache/bookkeeper/test/TestStatsProvider.java b/bookkeeper-common/src/test/java/org/apache/bookkeeper/test/TestStatsProvider.java index 624791cdd5a..f4860756514 100644 --- a/bookkeeper-common/src/test/java/org/apache/bookkeeper/test/TestStatsProvider.java +++ b/bookkeeper-common/src/test/java/org/apache/bookkeeper/test/TestStatsProvider.java @@ -197,6 +197,16 @@ public StatsLogger scope(String name) { @Override public void removeScope(String name, StatsLogger statsLogger) {} + + @Override + public OpStatsLogger getThreadScopedOpStatsLogger(String name) { + return getOpStatsLogger(name); + } + + @Override + public Counter getThreadScopedCounter(String name) { + return getCounter(name); + } } @Override diff --git a/bookkeeper-stats-providers/codahale-metrics-provider/src/main/java/org/apache/bookkeeper/stats/codahale/CodahaleStatsLogger.java b/bookkeeper-stats-providers/codahale-metrics-provider/src/main/java/org/apache/bookkeeper/stats/codahale/CodahaleStatsLogger.java index dba2e121f52..ade182d1c7f 100644 --- a/bookkeeper-stats-providers/codahale-metrics-provider/src/main/java/org/apache/bookkeeper/stats/codahale/CodahaleStatsLogger.java +++ b/bookkeeper-stats-providers/codahale-metrics-provider/src/main/java/org/apache/bookkeeper/stats/codahale/CodahaleStatsLogger.java @@ -109,4 +109,20 @@ public StatsLogger scope(String scope) { public void removeScope(String name, StatsLogger statsLogger) { // no-op. the codahale stats logger doesn't have the references for stats logger. } + + /** + Thread-scoped stats not currently supported. + */ + @Override + public OpStatsLogger getThreadScopedOpStatsLogger(String name) { + return getOpStatsLogger(name); + } + + /** + Thread-scoped stats not currently supported. + */ + @Override + public Counter getThreadScopedCounter(String name) { + return getCounter(name); + } } diff --git a/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/DataSketchesOpStatsLogger.java b/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/DataSketchesOpStatsLogger.java index 0c72d580712..403cb18ac85 100644 --- a/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/DataSketchesOpStatsLogger.java +++ b/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/DataSketchesOpStatsLogger.java @@ -55,7 +55,10 @@ public class DataSketchesOpStatsLogger implements OpStatsLogger { private final LongAdder successSumAdder = new LongAdder(); private final LongAdder failSumAdder = new LongAdder(); - private final Map labels; + private Map labels; + + // used for lazy registration for thread scoped metrics + private boolean threadInitialized; public DataSketchesOpStatsLogger(Map labels) { this.current = new ThreadLocalAccessor(); @@ -180,6 +183,15 @@ public Map getLabels() { return labels; } + public boolean isThreadInitialized() { + return threadInitialized; + } + + public void initializeThread(Map labels) { + this.labels = labels; + this.threadInitialized = true; + } + private static class LocalData { private final DoublesSketch successSketch = new DoublesSketchBuilder().build(); private final DoublesSketch failSketch = new DoublesSketchBuilder().build(); diff --git a/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/LongAdderCounter.java b/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/LongAdderCounter.java index ddd278e3671..bf3ccae2a5c 100644 --- a/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/LongAdderCounter.java +++ b/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/LongAdderCounter.java @@ -30,7 +30,10 @@ public class LongAdderCounter implements Counter { private final LongAdder counter = new LongAdder(); - private final Map labels; + private Map labels; + + // used for lazy registration for thread scoped metric + private boolean threadInitialized; public LongAdderCounter(Map labels) { this.labels = labels; @@ -64,4 +67,13 @@ public Long get() { public Map getLabels() { return labels; } + + public boolean isThreadInitialized() { + return threadInitialized; + } + + public void initializeThread(Map labels) { + this.labels = labels; + this.threadInitialized = true; + } } diff --git a/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/PrometheusMetricsProvider.java b/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/PrometheusMetricsProvider.java index 573df792f3b..26b055f92ce 100644 --- a/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/PrometheusMetricsProvider.java +++ b/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/PrometheusMetricsProvider.java @@ -82,6 +82,10 @@ public class PrometheusMetricsProvider implements StatsProvider { final ConcurrentMap counters = new ConcurrentHashMap<>(); final ConcurrentMap> gauges = new ConcurrentHashMap<>(); final ConcurrentMap opStats = new ConcurrentHashMap<>(); + final ConcurrentMap threadScopedOpStats = + new ConcurrentHashMap<>(); + final ConcurrentMap threadScopedCounters = + new ConcurrentHashMap<>(); public PrometheusMetricsProvider() { this(CollectorRegistry.defaultRegistry); diff --git a/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/PrometheusStatsLogger.java b/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/PrometheusStatsLogger.java index dcc75270b71..d1ff7fd1f81 100644 --- a/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/PrometheusStatsLogger.java +++ b/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/PrometheusStatsLogger.java @@ -46,11 +46,22 @@ public OpStatsLogger getOpStatsLogger(String name) { return provider.opStats.computeIfAbsent(scopeContext(name), x -> new DataSketchesOpStatsLogger(labels)); } + @Override + public OpStatsLogger getThreadScopedOpStatsLogger(String name) { + return provider.threadScopedOpStats.computeIfAbsent(scopeContext(name), + x -> new ThreadScopedDataSketchesStatsLogger(provider, scopeContext(name), labels)); + } + @Override public Counter getCounter(String name) { return provider.counters.computeIfAbsent(scopeContext(name), x -> new LongAdderCounter(labels)); } + public Counter getThreadScopedCounter(String name) { + return provider.threadScopedCounters.computeIfAbsent(scopeContext(name), + x -> new ThreadScopedLongAdderCounter(provider, scopeContext(name), labels)); + } + @Override public void registerGauge(String name, Gauge gauge) { provider.gauges.computeIfAbsent(scopeContext(name), x -> new SimpleGauge(gauge, labels)); diff --git a/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/ThreadScopedDataSketchesStatsLogger.java b/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/ThreadScopedDataSketchesStatsLogger.java new file mode 100644 index 00000000000..1598e55301f --- /dev/null +++ b/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/ThreadScopedDataSketchesStatsLogger.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.bookkeeper.stats.prometheus; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.bookkeeper.stats.OpStatsData; +import org.apache.bookkeeper.stats.OpStatsLogger; +import org.apache.bookkeeper.stats.ThreadRegistry; + +/** + * OpStatsLogger implementation that lazily registers OpStatsLoggers per thread + * with added labels for the threadpool/thresd name and thread no. + */ +public class ThreadScopedDataSketchesStatsLogger implements OpStatsLogger { + + private ThreadLocal statsLoggers; + private DataSketchesOpStatsLogger defaultStatsLogger; + private Map originalLabels; + private ScopeContext scopeContext; + private PrometheusMetricsProvider provider; + + public ThreadScopedDataSketchesStatsLogger(PrometheusMetricsProvider provider, + ScopeContext scopeContext, + Map labels) { + this.provider = provider; + this.scopeContext = scopeContext; + this.originalLabels = labels; + this.defaultStatsLogger = new DataSketchesOpStatsLogger(labels); + + Map defaultLabels = new HashMap<>(labels); + defaultLabels.put("threadPool", "?"); + defaultLabels.put("thread", "?"); + this.defaultStatsLogger.initializeThread(defaultLabels); + + this.statsLoggers = ThreadLocal.withInitial(() -> { + return new DataSketchesOpStatsLogger(labels); + }); + } + + @Override + public void registerFailedEvent(long eventLatency, TimeUnit unit) { + getStatsLogger().registerFailedEvent(eventLatency, unit); + } + + @Override + public void registerSuccessfulEvent(long eventLatency, TimeUnit unit) { + getStatsLogger().registerSuccessfulEvent(eventLatency, unit); + } + + @Override + public void registerSuccessfulValue(long value) { + getStatsLogger().registerSuccessfulValue(value); + } + + @Override + public void registerFailedValue(long value) { + getStatsLogger().registerFailedValue(value); + } + + @Override + public OpStatsData toOpStatsData() { + // Not relevant as we don't use JMX here + throw new UnsupportedOperationException(); + } + + @Override + public void clear() { + // Not relevant as we don't use JMX here + throw new UnsupportedOperationException(); + } + + private DataSketchesOpStatsLogger getStatsLogger() { + DataSketchesOpStatsLogger statsLogger = statsLoggers.get(); + + // Lazy registration + // Update the stats logger with the thread labels then add to the provider + // If for some reason this thread did not get registered, + // then we fallback to a standard OpsStatsLogger (defaultStatsLogger) + if (!statsLogger.isThreadInitialized()) { + ThreadRegistry.ThreadPoolThread tpt = ThreadRegistry.get(); + if (tpt == null) { + statsLoggers.set(defaultStatsLogger); + provider.opStats.put(new ScopeContext(scopeContext.getScope(), originalLabels), defaultStatsLogger); + return defaultStatsLogger; + } else { + Map threadScopedlabels = new HashMap<>(originalLabels); + threadScopedlabels.put("threadPool", tpt.getThreadPool()); + threadScopedlabels.put("thread", String.valueOf(tpt.getOrdinal())); + + statsLogger.initializeThread(threadScopedlabels); + provider.opStats.put(new ScopeContext(scopeContext.getScope(), threadScopedlabels), statsLogger); + } + } + + return statsLogger; + } +} \ No newline at end of file diff --git a/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/ThreadScopedLongAdderCounter.java b/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/ThreadScopedLongAdderCounter.java new file mode 100644 index 00000000000..59a657f3817 --- /dev/null +++ b/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/ThreadScopedLongAdderCounter.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.bookkeeper.stats.prometheus; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.bookkeeper.stats.Counter; +import org.apache.bookkeeper.stats.ThreadRegistry; + +/** + * {@link Counter} implementation that lazily registers LongAdderCounters per thread + * * with added labels for the threadpool/thread name and thread no. + */ +public class ThreadScopedLongAdderCounter implements Counter { + private ThreadLocal counters; + private LongAdderCounter defaultCounter; + private Map originalLabels; + private ScopeContext scopeContext; + private PrometheusMetricsProvider provider; + + public ThreadScopedLongAdderCounter(PrometheusMetricsProvider provider, + ScopeContext scopeContext, + Map labels) { + this.provider = provider; + this.scopeContext = scopeContext; + this.originalLabels = new HashMap<>(labels); + this.defaultCounter = new LongAdderCounter(labels); + Map defaultLabels = new HashMap<>(labels); + defaultLabels.put("threadPool", "?"); + defaultLabels.put("thread", "?"); + this.defaultCounter.initializeThread(defaultLabels); + + this.counters = ThreadLocal.withInitial(() -> { + return new LongAdderCounter(labels); + }); + } + + @Override + public void clear() { + getCounter().clear(); + } + + @Override + public void inc() { + getCounter().inc(); + } + + @Override + public void dec() { + getCounter().dec(); + } + + @Override + public void add(long delta) { + getCounter().add(delta); + } + + @Override + public Long get() { + return getCounter().get(); + } + + private LongAdderCounter getCounter() { + LongAdderCounter counter = counters.get(); + + // Lazy registration + // Update the counter with the thread labels then add to the provider + // If for some reason this thread did not get registered, + // then we fallback to a standard counter (defaultCounter) + if (!counter.isThreadInitialized()) { + ThreadRegistry.ThreadPoolThread tpt = ThreadRegistry.get(); + + if (tpt == null) { + counters.set(defaultCounter); + provider.counters.put(new ScopeContext(scopeContext.getScope(), originalLabels), defaultCounter); + return defaultCounter; + } else { + Map threadScopedlabels = new HashMap<>(originalLabels); + threadScopedlabels.put("threadPool", tpt.getThreadPool()); + threadScopedlabels.put("thread", String.valueOf(tpt.getOrdinal())); + + counter.initializeThread(threadScopedlabels); + provider.counters.put(new ScopeContext(scopeContext.getScope(), threadScopedlabels), counter); + } + } + + return counter; + } +} diff --git a/bookkeeper-stats/src/main/java/org/apache/bookkeeper/stats/CachingStatsLogger.java b/bookkeeper-stats/src/main/java/org/apache/bookkeeper/stats/CachingStatsLogger.java index 69b6d176675..d86e19a176f 100644 --- a/bookkeeper-stats/src/main/java/org/apache/bookkeeper/stats/CachingStatsLogger.java +++ b/bookkeeper-stats/src/main/java/org/apache/bookkeeper/stats/CachingStatsLogger.java @@ -102,4 +102,20 @@ public StatsLogger scope(String name) { public void removeScope(String name, StatsLogger statsLogger) { scopeStatsLoggers.remove(name, statsLogger); } + + /** + Thread-scoped stats not currently supported. + */ + @Override + public OpStatsLogger getThreadScopedOpStatsLogger(String name) { + return getOpStatsLogger(name); + } + + /** + Thread-scoped stats not currently supported. + */ + @Override + public Counter getThreadScopedCounter(String name) { + return getCounter(name); + } } diff --git a/bookkeeper-stats/src/main/java/org/apache/bookkeeper/stats/NullStatsLogger.java b/bookkeeper-stats/src/main/java/org/apache/bookkeeper/stats/NullStatsLogger.java index 2d242c5acff..3e10b20c204 100644 --- a/bookkeeper-stats/src/main/java/org/apache/bookkeeper/stats/NullStatsLogger.java +++ b/bookkeeper-stats/src/main/java/org/apache/bookkeeper/stats/NullStatsLogger.java @@ -126,4 +126,14 @@ public StatsLogger scope(String name) { public void removeScope(String name, StatsLogger statsLogger) { // nop } + + @Override + public OpStatsLogger getThreadScopedOpStatsLogger(String name) { + return getOpStatsLogger(name); + } + + @Override + public Counter getThreadScopedCounter(String name) { + return getCounter(name); + } } diff --git a/bookkeeper-stats/src/main/java/org/apache/bookkeeper/stats/StatsLogger.java b/bookkeeper-stats/src/main/java/org/apache/bookkeeper/stats/StatsLogger.java index 3023d36118b..86c569c457e 100644 --- a/bookkeeper-stats/src/main/java/org/apache/bookkeeper/stats/StatsLogger.java +++ b/bookkeeper-stats/src/main/java/org/apache/bookkeeper/stats/StatsLogger.java @@ -28,6 +28,15 @@ public interface StatsLogger { */ OpStatsLogger getOpStatsLogger(String name); + /** + * @param name + * Stats Name + * @return Get the logger for an OpStat described by the name with extra + * labels for the threadpool/threadname and thread no. Lone threads always + * have 0 as their thread no. + */ + OpStatsLogger getThreadScopedOpStatsLogger(String name); + /** * @param name * Stats Name @@ -35,6 +44,15 @@ public interface StatsLogger { */ Counter getCounter(String name); + /** + * @param name + * Stats Name + * @return Get the logger for a simple stat described by the name with extra + * labels for the threadpool/threadname and thread no. Lone threads always + * have 0 as their thread no. + */ + Counter getThreadScopedCounter(String name); + /** * Register given gauge as name name. * diff --git a/bookkeeper-stats/src/main/java/org/apache/bookkeeper/stats/ThreadRegistry.java b/bookkeeper-stats/src/main/java/org/apache/bookkeeper/stats/ThreadRegistry.java new file mode 100644 index 00000000000..66e22af2b3d --- /dev/null +++ b/bookkeeper-stats/src/main/java/org/apache/bookkeeper/stats/ThreadRegistry.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.bookkeeper.stats; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * For mapping thread ids to thread pools and threads within those pools + * or just for lone named threads. Thread scoped metrics add labels to + * metrics by retrieving the ThreadPoolThread object from this registry. + * For flexibility, this registry is not based on TLS. + */ +public class ThreadRegistry { + private static ConcurrentMap threadPoolMap = new ConcurrentHashMap<>(); + + /* + Threads can register themselves as their first act before carrying out + any work. + */ + public static void register(String threadPool, int threadPoolThread) { + register(threadPool, threadPoolThread, Thread.currentThread().getId()); + } + + /* + Thread factories can register a thread by its id. + */ + public static void register(String threadPool, int threadPoolThread, long threadId) { + ThreadPoolThread tpt = new ThreadPoolThread(threadPool, threadPoolThread, threadId); + threadPoolMap.put(threadId, tpt); + } + + /* + Retrieves the registered ThreadPoolThread (if registered) for the calling thread. + */ + public static ThreadPoolThread get() { + return threadPoolMap.get(Thread.currentThread().getId()); + } + + /** + * Stores the thread pool and ordinal. + */ + public static class ThreadPoolThread { + String threadPool; + int ordinal; + long threadId; + + public ThreadPoolThread(String threadPool, int ordinal, long threadId) { + this.threadPool = threadPool; + this.ordinal = ordinal; + this.threadId = threadId; + } + + public String getThreadPool() { + return threadPool; + } + + public int getOrdinal() { + return ordinal; + } + } +} diff --git a/stream/distributedlog/common/src/main/java/org/apache/distributedlog/common/stats/BroadCastStatsLogger.java b/stream/distributedlog/common/src/main/java/org/apache/distributedlog/common/stats/BroadCastStatsLogger.java index 6ae269b4124..5c9d035d481 100644 --- a/stream/distributedlog/common/src/main/java/org/apache/distributedlog/common/stats/BroadCastStatsLogger.java +++ b/stream/distributedlog/common/src/main/java/org/apache/distributedlog/common/stats/BroadCastStatsLogger.java @@ -167,6 +167,22 @@ public void removeScope(String scope, StatsLogger statsLogger) { first.removeScope(scope, another.first); second.removeScope(scope, another.second); } + + /** + Thread-scoped stats not currently supported. + */ + @Override + public OpStatsLogger getThreadScopedOpStatsLogger(String name) { + return getOpStatsLogger(name); + } + + /** + Thread-scoped stats not currently supported. + */ + @Override + public Counter getThreadScopedCounter(String name) { + return getCounter(name); + } } /** From 07563ba785d3f9073641a3c42a99e55afafd8dd4 Mon Sep 17 00:00:00 2001 From: Jack Vanlightly Date: Fri, 22 Oct 2021 12:22:02 +0200 Subject: [PATCH 2/3] ThreadRegistry clean up --- .../stats/codahale/CodahaleMetricsProvider.java | 2 ++ .../prometheus/PrometheusMetricsProvider.java | 3 +++ .../apache/bookkeeper/stats/ThreadRegistry.java | 15 +++++++++++---- 3 files changed, 16 insertions(+), 4 deletions(-) diff --git a/bookkeeper-stats-providers/codahale-metrics-provider/src/main/java/org/apache/bookkeeper/stats/codahale/CodahaleMetricsProvider.java b/bookkeeper-stats-providers/codahale-metrics-provider/src/main/java/org/apache/bookkeeper/stats/codahale/CodahaleMetricsProvider.java index 1b2beacf2da..f700e400a34 100644 --- a/bookkeeper-stats-providers/codahale-metrics-provider/src/main/java/org/apache/bookkeeper/stats/codahale/CodahaleMetricsProvider.java +++ b/bookkeeper-stats-providers/codahale-metrics-provider/src/main/java/org/apache/bookkeeper/stats/codahale/CodahaleMetricsProvider.java @@ -35,6 +35,7 @@ import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.stats.StatsProvider; +import org.apache.bookkeeper.stats.ThreadRegistry; import org.apache.commons.configuration.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -134,6 +135,7 @@ public void stop() { if (jmx != null) { jmx.stop(); } + ThreadRegistry.clear(); } @Override diff --git a/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/PrometheusMetricsProvider.java b/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/PrometheusMetricsProvider.java index 26b055f92ce..c0b89814bab 100644 --- a/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/PrometheusMetricsProvider.java +++ b/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/PrometheusMetricsProvider.java @@ -45,6 +45,7 @@ import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.stats.StatsProvider; +import org.apache.bookkeeper.stats.ThreadRegistry; import org.apache.commons.configuration.Configuration; import org.apache.commons.lang.StringUtils; import org.eclipse.jetty.server.Server; @@ -158,6 +159,8 @@ public void stop() { server.stop(); } catch (Exception e) { log.warn("Failed to shutdown Jetty server", e); + } finally { + ThreadRegistry.clear(); } } } diff --git a/bookkeeper-stats/src/main/java/org/apache/bookkeeper/stats/ThreadRegistry.java b/bookkeeper-stats/src/main/java/org/apache/bookkeeper/stats/ThreadRegistry.java index 66e22af2b3d..fccf30e6955 100644 --- a/bookkeeper-stats/src/main/java/org/apache/bookkeeper/stats/ThreadRegistry.java +++ b/bookkeeper-stats/src/main/java/org/apache/bookkeeper/stats/ThreadRegistry.java @@ -44,6 +44,13 @@ public static void register(String threadPool, int threadPoolThread, long thread threadPoolMap.put(threadId, tpt); } + /* + Clears all stored thread state. + */ + public static void clear() { + threadPoolMap.clear(); + } + /* Retrieves the registered ThreadPoolThread (if registered) for the calling thread. */ @@ -54,10 +61,10 @@ public static ThreadPoolThread get() { /** * Stores the thread pool and ordinal. */ - public static class ThreadPoolThread { - String threadPool; - int ordinal; - long threadId; + public static final class ThreadPoolThread { + final String threadPool; + final int ordinal; + final long threadId; public ThreadPoolThread(String threadPool, int ordinal, long threadId) { this.threadPool = threadPool; From 2653bd4748fc6d7b861797d1b5ca5daddba5c613 Mon Sep 17 00:00:00 2001 From: Jack Vanlightly Date: Fri, 22 Oct 2021 12:22:50 +0200 Subject: [PATCH 3/3] Change to make lamba non-capturing --- .../bookkeeper/stats/prometheus/PrometheusStatsLogger.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/PrometheusStatsLogger.java b/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/PrometheusStatsLogger.java index d1ff7fd1f81..35b8147ec50 100644 --- a/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/PrometheusStatsLogger.java +++ b/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/PrometheusStatsLogger.java @@ -49,7 +49,7 @@ public OpStatsLogger getOpStatsLogger(String name) { @Override public OpStatsLogger getThreadScopedOpStatsLogger(String name) { return provider.threadScopedOpStats.computeIfAbsent(scopeContext(name), - x -> new ThreadScopedDataSketchesStatsLogger(provider, scopeContext(name), labels)); + x -> new ThreadScopedDataSketchesStatsLogger(provider, x, labels)); } @Override @@ -59,7 +59,7 @@ public Counter getCounter(String name) { public Counter getThreadScopedCounter(String name) { return provider.threadScopedCounters.computeIfAbsent(scopeContext(name), - x -> new ThreadScopedLongAdderCounter(provider, scopeContext(name), labels)); + x -> new ThreadScopedLongAdderCounter(provider, x, labels)); } @Override