Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,22 @@ public void removeScope(String scope, StatsLogger statsLogger) {
first.removeScope(scope, another.first);
second.removeScope(scope, another.second);
}

/**
Thread-scoped stats not currently supported.
*/
@Override
public OpStatsLogger getThreadScopedOpStatsLogger(String name) {
return getOpStatsLogger(name);
}

/**
Thread-scoped stats not currently supported.
*/
@Override
public Counter getThreadScopedCounter(String name) {
return getCounter(name);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,16 @@ public StatsLogger scope(String name) {

@Override
public void removeScope(String name, StatsLogger statsLogger) {}

@Override
public OpStatsLogger getThreadScopedOpStatsLogger(String name) {
return getOpStatsLogger(name);
}

@Override
public Counter getThreadScopedCounter(String name) {
return getCounter(name);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.stats.StatsProvider;
import org.apache.bookkeeper.stats.ThreadRegistry;
import org.apache.commons.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -134,6 +135,7 @@ public void stop() {
if (jmx != null) {
jmx.stop();
}
ThreadRegistry.clear();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,4 +109,20 @@ public StatsLogger scope(String scope) {
public void removeScope(String name, StatsLogger statsLogger) {
// no-op. the codahale stats logger doesn't have the references for stats logger.
}

/**
Thread-scoped stats not currently supported.
*/
@Override
public OpStatsLogger getThreadScopedOpStatsLogger(String name) {
return getOpStatsLogger(name);
}

/**
Thread-scoped stats not currently supported.
*/
@Override
public Counter getThreadScopedCounter(String name) {
return getCounter(name);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@ public class DataSketchesOpStatsLogger implements OpStatsLogger {
private final LongAdder successSumAdder = new LongAdder();
private final LongAdder failSumAdder = new LongAdder();

private final Map<String, String> labels;
private Map<String, String> labels;

// used for lazy registration for thread scoped metrics
private boolean threadInitialized;

public DataSketchesOpStatsLogger(Map<String, String> labels) {
this.current = new ThreadLocalAccessor();
Expand Down Expand Up @@ -180,6 +183,15 @@ public Map<String, String> getLabels() {
return labels;
}

public boolean isThreadInitialized() {
return threadInitialized;
}

public void initializeThread(Map<String, String> labels) {
this.labels = labels;
this.threadInitialized = true;
}

private static class LocalData {
private final DoublesSketch successSketch = new DoublesSketchBuilder().build();
private final DoublesSketch failSketch = new DoublesSketchBuilder().build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@
public class LongAdderCounter implements Counter {
private final LongAdder counter = new LongAdder();

private final Map<String, String> labels;
private Map<String, String> labels;

// used for lazy registration for thread scoped metric
private boolean threadInitialized;

public LongAdderCounter(Map<String, String> labels) {
this.labels = labels;
Expand Down Expand Up @@ -64,4 +67,13 @@ public Long get() {
public Map<String, String> getLabels() {
return labels;
}

public boolean isThreadInitialized() {
return threadInitialized;
}

public void initializeThread(Map<String, String> labels) {
this.labels = labels;
this.threadInitialized = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@

import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.stats.StatsProvider;
import org.apache.bookkeeper.stats.ThreadRegistry;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
import org.eclipse.jetty.server.Server;
Expand Down Expand Up @@ -82,6 +83,10 @@ public class PrometheusMetricsProvider implements StatsProvider {
final ConcurrentMap<ScopeContext, LongAdderCounter> counters = new ConcurrentHashMap<>();
final ConcurrentMap<ScopeContext, SimpleGauge<? extends Number>> gauges = new ConcurrentHashMap<>();
final ConcurrentMap<ScopeContext, DataSketchesOpStatsLogger> opStats = new ConcurrentHashMap<>();
final ConcurrentMap<ScopeContext, ThreadScopedDataSketchesStatsLogger> threadScopedOpStats =
new ConcurrentHashMap<>();
final ConcurrentMap<ScopeContext, ThreadScopedLongAdderCounter> threadScopedCounters =
new ConcurrentHashMap<>();

public PrometheusMetricsProvider() {
this(CollectorRegistry.defaultRegistry);
Expand Down Expand Up @@ -154,6 +159,8 @@ public void stop() {
server.stop();
} catch (Exception e) {
log.warn("Failed to shutdown Jetty server", e);
} finally {
ThreadRegistry.clear();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,22 @@ public OpStatsLogger getOpStatsLogger(String name) {
return provider.opStats.computeIfAbsent(scopeContext(name), x -> new DataSketchesOpStatsLogger(labels));
}

@Override
public OpStatsLogger getThreadScopedOpStatsLogger(String name) {
return provider.threadScopedOpStats.computeIfAbsent(scopeContext(name),
x -> new ThreadScopedDataSketchesStatsLogger(provider, x, labels));
}

@Override
public Counter getCounter(String name) {
return provider.counters.computeIfAbsent(scopeContext(name), x -> new LongAdderCounter(labels));
}

public Counter getThreadScopedCounter(String name) {
return provider.threadScopedCounters.computeIfAbsent(scopeContext(name),
x -> new ThreadScopedLongAdderCounter(provider, x, labels));
}

@Override
public <T extends Number> void registerGauge(String name, Gauge<T> gauge) {
provider.gauges.computeIfAbsent(scopeContext(name), x -> new SimpleGauge<T>(gauge, labels));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.bookkeeper.stats.prometheus;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.apache.bookkeeper.stats.OpStatsData;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.ThreadRegistry;

/**
* OpStatsLogger implementation that lazily registers OpStatsLoggers per thread
* with added labels for the threadpool/thresd name and thread no.
*/
public class ThreadScopedDataSketchesStatsLogger implements OpStatsLogger {

private ThreadLocal<DataSketchesOpStatsLogger> statsLoggers;
private DataSketchesOpStatsLogger defaultStatsLogger;
private Map<String, String> originalLabels;
private ScopeContext scopeContext;
private PrometheusMetricsProvider provider;

public ThreadScopedDataSketchesStatsLogger(PrometheusMetricsProvider provider,
ScopeContext scopeContext,
Map<String, String> labels) {
this.provider = provider;
this.scopeContext = scopeContext;
this.originalLabels = labels;
this.defaultStatsLogger = new DataSketchesOpStatsLogger(labels);

Map<String, String> defaultLabels = new HashMap<>(labels);
defaultLabels.put("threadPool", "?");
defaultLabels.put("thread", "?");
this.defaultStatsLogger.initializeThread(defaultLabels);

this.statsLoggers = ThreadLocal.withInitial(() -> {
return new DataSketchesOpStatsLogger(labels);
});
}

@Override
public void registerFailedEvent(long eventLatency, TimeUnit unit) {
getStatsLogger().registerFailedEvent(eventLatency, unit);
}

@Override
public void registerSuccessfulEvent(long eventLatency, TimeUnit unit) {
getStatsLogger().registerSuccessfulEvent(eventLatency, unit);
}

@Override
public void registerSuccessfulValue(long value) {
getStatsLogger().registerSuccessfulValue(value);
}

@Override
public void registerFailedValue(long value) {
getStatsLogger().registerFailedValue(value);
}

@Override
public OpStatsData toOpStatsData() {
// Not relevant as we don't use JMX here
throw new UnsupportedOperationException();
}

@Override
public void clear() {
// Not relevant as we don't use JMX here
throw new UnsupportedOperationException();
}

private DataSketchesOpStatsLogger getStatsLogger() {
DataSketchesOpStatsLogger statsLogger = statsLoggers.get();

// Lazy registration
// Update the stats logger with the thread labels then add to the provider
// If for some reason this thread did not get registered,
// then we fallback to a standard OpsStatsLogger (defaultStatsLogger)
if (!statsLogger.isThreadInitialized()) {
ThreadRegistry.ThreadPoolThread tpt = ThreadRegistry.get();
if (tpt == null) {
statsLoggers.set(defaultStatsLogger);
provider.opStats.put(new ScopeContext(scopeContext.getScope(), originalLabels), defaultStatsLogger);
return defaultStatsLogger;
} else {
Map<String, String> threadScopedlabels = new HashMap<>(originalLabels);
threadScopedlabels.put("threadPool", tpt.getThreadPool());
threadScopedlabels.put("thread", String.valueOf(tpt.getOrdinal()));

statsLogger.initializeThread(threadScopedlabels);
provider.opStats.put(new ScopeContext(scopeContext.getScope(), threadScopedlabels), statsLogger);
}
}

return statsLogger;
}
}
Loading