diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/stats/BroadCastStatsLogger.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/stats/BroadCastStatsLogger.java index 1b9fdea6dcf..86020c0fd27 100644 --- a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/stats/BroadCastStatsLogger.java +++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/stats/BroadCastStatsLogger.java @@ -167,6 +167,22 @@ public void removeScope(String scope, StatsLogger statsLogger) { first.removeScope(scope, another.first); second.removeScope(scope, another.second); } + + /** + Thread-scoped stats not currently supported. + */ + @Override + public OpStatsLogger getThreadScopedOpStatsLogger(String name) { + return getOpStatsLogger(name); + } + + /** + Thread-scoped stats not currently supported. + */ + @Override + public Counter getThreadScopedCounter(String name) { + return getCounter(name); + } } /** diff --git a/bookkeeper-common/src/test/java/org/apache/bookkeeper/test/TestStatsProvider.java b/bookkeeper-common/src/test/java/org/apache/bookkeeper/test/TestStatsProvider.java index 624791cdd5a..f4860756514 100644 --- a/bookkeeper-common/src/test/java/org/apache/bookkeeper/test/TestStatsProvider.java +++ b/bookkeeper-common/src/test/java/org/apache/bookkeeper/test/TestStatsProvider.java @@ -197,6 +197,16 @@ public StatsLogger scope(String name) { @Override public void removeScope(String name, StatsLogger statsLogger) {} + + @Override + public OpStatsLogger getThreadScopedOpStatsLogger(String name) { + return getOpStatsLogger(name); + } + + @Override + public Counter getThreadScopedCounter(String name) { + return getCounter(name); + } } @Override diff --git a/bookkeeper-stats-providers/codahale-metrics-provider/src/main/java/org/apache/bookkeeper/stats/codahale/CodahaleMetricsProvider.java b/bookkeeper-stats-providers/codahale-metrics-provider/src/main/java/org/apache/bookkeeper/stats/codahale/CodahaleMetricsProvider.java index 1b2beacf2da..f700e400a34 100644 --- a/bookkeeper-stats-providers/codahale-metrics-provider/src/main/java/org/apache/bookkeeper/stats/codahale/CodahaleMetricsProvider.java +++ b/bookkeeper-stats-providers/codahale-metrics-provider/src/main/java/org/apache/bookkeeper/stats/codahale/CodahaleMetricsProvider.java @@ -35,6 +35,7 @@ import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.stats.StatsProvider; +import org.apache.bookkeeper.stats.ThreadRegistry; import org.apache.commons.configuration.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -134,6 +135,7 @@ public void stop() { if (jmx != null) { jmx.stop(); } + ThreadRegistry.clear(); } @Override diff --git a/bookkeeper-stats-providers/codahale-metrics-provider/src/main/java/org/apache/bookkeeper/stats/codahale/CodahaleStatsLogger.java b/bookkeeper-stats-providers/codahale-metrics-provider/src/main/java/org/apache/bookkeeper/stats/codahale/CodahaleStatsLogger.java index dba2e121f52..ade182d1c7f 100644 --- a/bookkeeper-stats-providers/codahale-metrics-provider/src/main/java/org/apache/bookkeeper/stats/codahale/CodahaleStatsLogger.java +++ b/bookkeeper-stats-providers/codahale-metrics-provider/src/main/java/org/apache/bookkeeper/stats/codahale/CodahaleStatsLogger.java @@ -109,4 +109,20 @@ public StatsLogger scope(String scope) { public void removeScope(String name, StatsLogger statsLogger) { // no-op. the codahale stats logger doesn't have the references for stats logger. } + + /** + Thread-scoped stats not currently supported. + */ + @Override + public OpStatsLogger getThreadScopedOpStatsLogger(String name) { + return getOpStatsLogger(name); + } + + /** + Thread-scoped stats not currently supported. + */ + @Override + public Counter getThreadScopedCounter(String name) { + return getCounter(name); + } } diff --git a/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/DataSketchesOpStatsLogger.java b/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/DataSketchesOpStatsLogger.java index 0c72d580712..403cb18ac85 100644 --- a/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/DataSketchesOpStatsLogger.java +++ b/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/DataSketchesOpStatsLogger.java @@ -55,7 +55,10 @@ public class DataSketchesOpStatsLogger implements OpStatsLogger { private final LongAdder successSumAdder = new LongAdder(); private final LongAdder failSumAdder = new LongAdder(); - private final Map labels; + private Map labels; + + // used for lazy registration for thread scoped metrics + private boolean threadInitialized; public DataSketchesOpStatsLogger(Map labels) { this.current = new ThreadLocalAccessor(); @@ -180,6 +183,15 @@ public Map getLabels() { return labels; } + public boolean isThreadInitialized() { + return threadInitialized; + } + + public void initializeThread(Map labels) { + this.labels = labels; + this.threadInitialized = true; + } + private static class LocalData { private final DoublesSketch successSketch = new DoublesSketchBuilder().build(); private final DoublesSketch failSketch = new DoublesSketchBuilder().build(); diff --git a/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/LongAdderCounter.java b/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/LongAdderCounter.java index ddd278e3671..bf3ccae2a5c 100644 --- a/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/LongAdderCounter.java +++ b/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/LongAdderCounter.java @@ -30,7 +30,10 @@ public class LongAdderCounter implements Counter { private final LongAdder counter = new LongAdder(); - private final Map labels; + private Map labels; + + // used for lazy registration for thread scoped metric + private boolean threadInitialized; public LongAdderCounter(Map labels) { this.labels = labels; @@ -64,4 +67,13 @@ public Long get() { public Map getLabels() { return labels; } + + public boolean isThreadInitialized() { + return threadInitialized; + } + + public void initializeThread(Map labels) { + this.labels = labels; + this.threadInitialized = true; + } } diff --git a/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/PrometheusMetricsProvider.java b/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/PrometheusMetricsProvider.java index 573df792f3b..c0b89814bab 100644 --- a/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/PrometheusMetricsProvider.java +++ b/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/PrometheusMetricsProvider.java @@ -45,6 +45,7 @@ import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.stats.StatsProvider; +import org.apache.bookkeeper.stats.ThreadRegistry; import org.apache.commons.configuration.Configuration; import org.apache.commons.lang.StringUtils; import org.eclipse.jetty.server.Server; @@ -82,6 +83,10 @@ public class PrometheusMetricsProvider implements StatsProvider { final ConcurrentMap counters = new ConcurrentHashMap<>(); final ConcurrentMap> gauges = new ConcurrentHashMap<>(); final ConcurrentMap opStats = new ConcurrentHashMap<>(); + final ConcurrentMap threadScopedOpStats = + new ConcurrentHashMap<>(); + final ConcurrentMap threadScopedCounters = + new ConcurrentHashMap<>(); public PrometheusMetricsProvider() { this(CollectorRegistry.defaultRegistry); @@ -154,6 +159,8 @@ public void stop() { server.stop(); } catch (Exception e) { log.warn("Failed to shutdown Jetty server", e); + } finally { + ThreadRegistry.clear(); } } } diff --git a/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/PrometheusStatsLogger.java b/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/PrometheusStatsLogger.java index dcc75270b71..35b8147ec50 100644 --- a/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/PrometheusStatsLogger.java +++ b/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/PrometheusStatsLogger.java @@ -46,11 +46,22 @@ public OpStatsLogger getOpStatsLogger(String name) { return provider.opStats.computeIfAbsent(scopeContext(name), x -> new DataSketchesOpStatsLogger(labels)); } + @Override + public OpStatsLogger getThreadScopedOpStatsLogger(String name) { + return provider.threadScopedOpStats.computeIfAbsent(scopeContext(name), + x -> new ThreadScopedDataSketchesStatsLogger(provider, x, labels)); + } + @Override public Counter getCounter(String name) { return provider.counters.computeIfAbsent(scopeContext(name), x -> new LongAdderCounter(labels)); } + public Counter getThreadScopedCounter(String name) { + return provider.threadScopedCounters.computeIfAbsent(scopeContext(name), + x -> new ThreadScopedLongAdderCounter(provider, x, labels)); + } + @Override public void registerGauge(String name, Gauge gauge) { provider.gauges.computeIfAbsent(scopeContext(name), x -> new SimpleGauge(gauge, labels)); diff --git a/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/ThreadScopedDataSketchesStatsLogger.java b/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/ThreadScopedDataSketchesStatsLogger.java new file mode 100644 index 00000000000..1598e55301f --- /dev/null +++ b/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/ThreadScopedDataSketchesStatsLogger.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.bookkeeper.stats.prometheus; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.bookkeeper.stats.OpStatsData; +import org.apache.bookkeeper.stats.OpStatsLogger; +import org.apache.bookkeeper.stats.ThreadRegistry; + +/** + * OpStatsLogger implementation that lazily registers OpStatsLoggers per thread + * with added labels for the threadpool/thresd name and thread no. + */ +public class ThreadScopedDataSketchesStatsLogger implements OpStatsLogger { + + private ThreadLocal statsLoggers; + private DataSketchesOpStatsLogger defaultStatsLogger; + private Map originalLabels; + private ScopeContext scopeContext; + private PrometheusMetricsProvider provider; + + public ThreadScopedDataSketchesStatsLogger(PrometheusMetricsProvider provider, + ScopeContext scopeContext, + Map labels) { + this.provider = provider; + this.scopeContext = scopeContext; + this.originalLabels = labels; + this.defaultStatsLogger = new DataSketchesOpStatsLogger(labels); + + Map defaultLabels = new HashMap<>(labels); + defaultLabels.put("threadPool", "?"); + defaultLabels.put("thread", "?"); + this.defaultStatsLogger.initializeThread(defaultLabels); + + this.statsLoggers = ThreadLocal.withInitial(() -> { + return new DataSketchesOpStatsLogger(labels); + }); + } + + @Override + public void registerFailedEvent(long eventLatency, TimeUnit unit) { + getStatsLogger().registerFailedEvent(eventLatency, unit); + } + + @Override + public void registerSuccessfulEvent(long eventLatency, TimeUnit unit) { + getStatsLogger().registerSuccessfulEvent(eventLatency, unit); + } + + @Override + public void registerSuccessfulValue(long value) { + getStatsLogger().registerSuccessfulValue(value); + } + + @Override + public void registerFailedValue(long value) { + getStatsLogger().registerFailedValue(value); + } + + @Override + public OpStatsData toOpStatsData() { + // Not relevant as we don't use JMX here + throw new UnsupportedOperationException(); + } + + @Override + public void clear() { + // Not relevant as we don't use JMX here + throw new UnsupportedOperationException(); + } + + private DataSketchesOpStatsLogger getStatsLogger() { + DataSketchesOpStatsLogger statsLogger = statsLoggers.get(); + + // Lazy registration + // Update the stats logger with the thread labels then add to the provider + // If for some reason this thread did not get registered, + // then we fallback to a standard OpsStatsLogger (defaultStatsLogger) + if (!statsLogger.isThreadInitialized()) { + ThreadRegistry.ThreadPoolThread tpt = ThreadRegistry.get(); + if (tpt == null) { + statsLoggers.set(defaultStatsLogger); + provider.opStats.put(new ScopeContext(scopeContext.getScope(), originalLabels), defaultStatsLogger); + return defaultStatsLogger; + } else { + Map threadScopedlabels = new HashMap<>(originalLabels); + threadScopedlabels.put("threadPool", tpt.getThreadPool()); + threadScopedlabels.put("thread", String.valueOf(tpt.getOrdinal())); + + statsLogger.initializeThread(threadScopedlabels); + provider.opStats.put(new ScopeContext(scopeContext.getScope(), threadScopedlabels), statsLogger); + } + } + + return statsLogger; + } +} \ No newline at end of file diff --git a/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/ThreadScopedLongAdderCounter.java b/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/ThreadScopedLongAdderCounter.java new file mode 100644 index 00000000000..59a657f3817 --- /dev/null +++ b/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/ThreadScopedLongAdderCounter.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.bookkeeper.stats.prometheus; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.bookkeeper.stats.Counter; +import org.apache.bookkeeper.stats.ThreadRegistry; + +/** + * {@link Counter} implementation that lazily registers LongAdderCounters per thread + * * with added labels for the threadpool/thread name and thread no. + */ +public class ThreadScopedLongAdderCounter implements Counter { + private ThreadLocal counters; + private LongAdderCounter defaultCounter; + private Map originalLabels; + private ScopeContext scopeContext; + private PrometheusMetricsProvider provider; + + public ThreadScopedLongAdderCounter(PrometheusMetricsProvider provider, + ScopeContext scopeContext, + Map labels) { + this.provider = provider; + this.scopeContext = scopeContext; + this.originalLabels = new HashMap<>(labels); + this.defaultCounter = new LongAdderCounter(labels); + Map defaultLabels = new HashMap<>(labels); + defaultLabels.put("threadPool", "?"); + defaultLabels.put("thread", "?"); + this.defaultCounter.initializeThread(defaultLabels); + + this.counters = ThreadLocal.withInitial(() -> { + return new LongAdderCounter(labels); + }); + } + + @Override + public void clear() { + getCounter().clear(); + } + + @Override + public void inc() { + getCounter().inc(); + } + + @Override + public void dec() { + getCounter().dec(); + } + + @Override + public void add(long delta) { + getCounter().add(delta); + } + + @Override + public Long get() { + return getCounter().get(); + } + + private LongAdderCounter getCounter() { + LongAdderCounter counter = counters.get(); + + // Lazy registration + // Update the counter with the thread labels then add to the provider + // If for some reason this thread did not get registered, + // then we fallback to a standard counter (defaultCounter) + if (!counter.isThreadInitialized()) { + ThreadRegistry.ThreadPoolThread tpt = ThreadRegistry.get(); + + if (tpt == null) { + counters.set(defaultCounter); + provider.counters.put(new ScopeContext(scopeContext.getScope(), originalLabels), defaultCounter); + return defaultCounter; + } else { + Map threadScopedlabels = new HashMap<>(originalLabels); + threadScopedlabels.put("threadPool", tpt.getThreadPool()); + threadScopedlabels.put("thread", String.valueOf(tpt.getOrdinal())); + + counter.initializeThread(threadScopedlabels); + provider.counters.put(new ScopeContext(scopeContext.getScope(), threadScopedlabels), counter); + } + } + + return counter; + } +} diff --git a/bookkeeper-stats/src/main/java/org/apache/bookkeeper/stats/CachingStatsLogger.java b/bookkeeper-stats/src/main/java/org/apache/bookkeeper/stats/CachingStatsLogger.java index 69b6d176675..d86e19a176f 100644 --- a/bookkeeper-stats/src/main/java/org/apache/bookkeeper/stats/CachingStatsLogger.java +++ b/bookkeeper-stats/src/main/java/org/apache/bookkeeper/stats/CachingStatsLogger.java @@ -102,4 +102,20 @@ public StatsLogger scope(String name) { public void removeScope(String name, StatsLogger statsLogger) { scopeStatsLoggers.remove(name, statsLogger); } + + /** + Thread-scoped stats not currently supported. + */ + @Override + public OpStatsLogger getThreadScopedOpStatsLogger(String name) { + return getOpStatsLogger(name); + } + + /** + Thread-scoped stats not currently supported. + */ + @Override + public Counter getThreadScopedCounter(String name) { + return getCounter(name); + } } diff --git a/bookkeeper-stats/src/main/java/org/apache/bookkeeper/stats/NullStatsLogger.java b/bookkeeper-stats/src/main/java/org/apache/bookkeeper/stats/NullStatsLogger.java index 2d242c5acff..3e10b20c204 100644 --- a/bookkeeper-stats/src/main/java/org/apache/bookkeeper/stats/NullStatsLogger.java +++ b/bookkeeper-stats/src/main/java/org/apache/bookkeeper/stats/NullStatsLogger.java @@ -126,4 +126,14 @@ public StatsLogger scope(String name) { public void removeScope(String name, StatsLogger statsLogger) { // nop } + + @Override + public OpStatsLogger getThreadScopedOpStatsLogger(String name) { + return getOpStatsLogger(name); + } + + @Override + public Counter getThreadScopedCounter(String name) { + return getCounter(name); + } } diff --git a/bookkeeper-stats/src/main/java/org/apache/bookkeeper/stats/StatsLogger.java b/bookkeeper-stats/src/main/java/org/apache/bookkeeper/stats/StatsLogger.java index 3023d36118b..86c569c457e 100644 --- a/bookkeeper-stats/src/main/java/org/apache/bookkeeper/stats/StatsLogger.java +++ b/bookkeeper-stats/src/main/java/org/apache/bookkeeper/stats/StatsLogger.java @@ -28,6 +28,15 @@ public interface StatsLogger { */ OpStatsLogger getOpStatsLogger(String name); + /** + * @param name + * Stats Name + * @return Get the logger for an OpStat described by the name with extra + * labels for the threadpool/threadname and thread no. Lone threads always + * have 0 as their thread no. + */ + OpStatsLogger getThreadScopedOpStatsLogger(String name); + /** * @param name * Stats Name @@ -35,6 +44,15 @@ public interface StatsLogger { */ Counter getCounter(String name); + /** + * @param name + * Stats Name + * @return Get the logger for a simple stat described by the name with extra + * labels for the threadpool/threadname and thread no. Lone threads always + * have 0 as their thread no. + */ + Counter getThreadScopedCounter(String name); + /** * Register given gauge as name name. * diff --git a/bookkeeper-stats/src/main/java/org/apache/bookkeeper/stats/ThreadRegistry.java b/bookkeeper-stats/src/main/java/org/apache/bookkeeper/stats/ThreadRegistry.java new file mode 100644 index 00000000000..fccf30e6955 --- /dev/null +++ b/bookkeeper-stats/src/main/java/org/apache/bookkeeper/stats/ThreadRegistry.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.bookkeeper.stats; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * For mapping thread ids to thread pools and threads within those pools + * or just for lone named threads. Thread scoped metrics add labels to + * metrics by retrieving the ThreadPoolThread object from this registry. + * For flexibility, this registry is not based on TLS. + */ +public class ThreadRegistry { + private static ConcurrentMap threadPoolMap = new ConcurrentHashMap<>(); + + /* + Threads can register themselves as their first act before carrying out + any work. + */ + public static void register(String threadPool, int threadPoolThread) { + register(threadPool, threadPoolThread, Thread.currentThread().getId()); + } + + /* + Thread factories can register a thread by its id. + */ + public static void register(String threadPool, int threadPoolThread, long threadId) { + ThreadPoolThread tpt = new ThreadPoolThread(threadPool, threadPoolThread, threadId); + threadPoolMap.put(threadId, tpt); + } + + /* + Clears all stored thread state. + */ + public static void clear() { + threadPoolMap.clear(); + } + + /* + Retrieves the registered ThreadPoolThread (if registered) for the calling thread. + */ + public static ThreadPoolThread get() { + return threadPoolMap.get(Thread.currentThread().getId()); + } + + /** + * Stores the thread pool and ordinal. + */ + public static final class ThreadPoolThread { + final String threadPool; + final int ordinal; + final long threadId; + + public ThreadPoolThread(String threadPool, int ordinal, long threadId) { + this.threadPool = threadPool; + this.ordinal = ordinal; + this.threadId = threadId; + } + + public String getThreadPool() { + return threadPool; + } + + public int getOrdinal() { + return ordinal; + } + } +} diff --git a/stream/distributedlog/common/src/main/java/org/apache/distributedlog/common/stats/BroadCastStatsLogger.java b/stream/distributedlog/common/src/main/java/org/apache/distributedlog/common/stats/BroadCastStatsLogger.java index 6ae269b4124..5c9d035d481 100644 --- a/stream/distributedlog/common/src/main/java/org/apache/distributedlog/common/stats/BroadCastStatsLogger.java +++ b/stream/distributedlog/common/src/main/java/org/apache/distributedlog/common/stats/BroadCastStatsLogger.java @@ -167,6 +167,22 @@ public void removeScope(String scope, StatsLogger statsLogger) { first.removeScope(scope, another.first); second.removeScope(scope, another.second); } + + /** + Thread-scoped stats not currently supported. + */ + @Override + public OpStatsLogger getThreadScopedOpStatsLogger(String name) { + return getOpStatsLogger(name); + } + + /** + Thread-scoped stats not currently supported. + */ + @Override + public Counter getThreadScopedCounter(String name) { + return getCounter(name); + } } /**