Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ public class GrpcServerMetrics extends RatisMetrics {
public static final String RATIS_GRPC_METRICS_REQUESTS_COUNT = "num_requests";
public static final String RATIS_GRPC_INSTALL_SNAPSHOT_COUNT = "num_install_snapshot";

private final LongCounter requestRetry;
private final LongCounter requestInstallSnapshot;
private final LongCounter requestRetry = getRegistry().counter(RATIS_GRPC_METRICS_REQUEST_RETRY_COUNT);
private final LongCounter requestInstallSnapshot = getRegistry().counter(RATIS_GRPC_INSTALL_SNAPSHOT_COUNT);

private final Function<Boolean, LongCounter> requestCreate;
private final Function<Boolean, LongCounter> requestCreate = newHeartbeatCounter(RATIS_GRPC_METRICS_REQUESTS_COUNT);

private final Map<String, Function<Boolean, LongCounter>> requestSuccess = new ConcurrentHashMap<>();
private final Map<String, Function<Boolean, LongCounter>> requestTimeout = new ConcurrentHashMap<>();
Expand All @@ -66,15 +66,10 @@ public class GrpcServerMetrics extends RatisMetrics {
private final Map<String, String> appendLogLatency = new ConcurrentHashMap<>();

public GrpcServerMetrics(String serverId) {
registry = getMetricRegistryForGrpcServer(serverId);

requestRetry = registry.counter(RATIS_GRPC_METRICS_REQUEST_RETRY_COUNT);
requestInstallSnapshot = registry.counter(RATIS_GRPC_INSTALL_SNAPSHOT_COUNT);

requestCreate = newHeartbeatCounter(RATIS_GRPC_METRICS_REQUESTS_COUNT);
super(createRegistry(serverId));
}

private RatisMetricRegistry getMetricRegistryForGrpcServer(String serverId) {
private static RatisMetricRegistry createRegistry(String serverId) {
return create(new MetricRegistryInfo(serverId,
RATIS_GRPC_METRICS_APP_NAME,
RATIS_GRPC_METRICS_COMP_NAME, RATIS_GRPC_METRICS_DESC));
Expand All @@ -84,7 +79,7 @@ public Timekeeper getGrpcLogAppenderLatencyTimer(String follower, boolean isHear
final Map<String, String> map = isHeartbeat ? heartbeatLatency : appendLogLatency;
final String name = map.computeIfAbsent(follower,
key -> String.format(RATIS_GRPC_METRICS_LOG_APPENDER_LATENCY + getHeartbeatSuffix(isHeartbeat), key));
return registry.timer(name);
return getRegistry().timer(name);
}

public void onRequestRetry() {
Expand All @@ -105,15 +100,15 @@ public void onRequestSuccess(String follower, boolean isHeartbeat) {
}

private LongCounter newRequestNotLeader(String follower) {
return registry.counter(String.format(RATIS_GRPC_METRICS_LOG_APPENDER_NOT_LEADER, follower));
return getRegistry().counter(String.format(RATIS_GRPC_METRICS_LOG_APPENDER_NOT_LEADER, follower));
}

public void onRequestNotLeader(String follower) {
requestNotLeader.computeIfAbsent(follower, this::newRequestNotLeader).inc();
}

private LongCounter newRequestInconsistency(String follower) {
return registry.counter(String.format(RATIS_GRPC_METRICS_LOG_APPENDER_INCONSISTENCY, follower));
return getRegistry().counter(String.format(RATIS_GRPC_METRICS_LOG_APPENDER_INCONSISTENCY, follower));
}

public void onRequestInconsistency(String follower) {
Expand All @@ -130,20 +125,11 @@ public void onRequestTimeout(String follower, boolean isHeartbeat) {
}

public void addPendingRequestsCount(String follower, Supplier<Integer> pendinglogQueueSize) {
registry.gauge(String.format(RATIS_GRPC_METRICS_LOG_APPENDER_PENDING_COUNT, follower), () -> pendinglogQueueSize);
final String name = String.format(RATIS_GRPC_METRICS_LOG_APPENDER_PENDING_COUNT, follower);
getRegistry().gauge(name, () -> pendinglogQueueSize);
}

public void onInstallSnapshot() {
requestInstallSnapshot.inc();
}

private Function<Boolean, LongCounter> newHeartbeatCounter(String prefix) {
final LongCounter trueCounter = registry.counter(prefix + getHeartbeatSuffix(true));
final LongCounter falseCounter = registry.counter(prefix + getHeartbeatSuffix(false));
return b -> b? trueCounter : falseCounter;
}

public static String getHeartbeatSuffix(boolean heartbeat) {
return heartbeat ? "_heartbeat" : "";
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand All @@ -17,49 +17,72 @@
*/
package org.apache.ratis.grpc.metrics;

import org.apache.ratis.metrics.LongCounter;
import org.apache.ratis.metrics.MetricRegistryInfo;
import org.apache.ratis.metrics.RatisMetricRegistry;
import org.apache.ratis.metrics.RatisMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

public class MessageMetrics extends RatisMetrics {
static final Logger LOG = LoggerFactory.getLogger(MessageMetrics.class);
public static final String GRPC_MESSAGE_METRICS = "%s_message_metrics";
public static final String GRPC_MESSAGE_METRICS_DESC = "Outbound/Inbound message counters";

private enum Type {
STARTED("_started_total"),
COMPLETED("_completed_total"),
RECEIVED("_received_executed");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_received_executed -> _received_total or _received_executed_total

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@codings-dan , I also want to change some of the metrics names but it may break some external metrics applications. So, we are better to leave them unchanged.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, thanks!


private final String suffix;

Type(String suffix) {
this.suffix = suffix;
}

String getSuffix() {
return suffix;
}
}

private final Map<Type, Map<String, LongCounter>> types;

public MessageMetrics(String endpointId, String endpointType) {
this.registry = create(
new MetricRegistryInfo(endpointId,
RATIS_APPLICATION_NAME_METRICS,
String.format(GRPC_MESSAGE_METRICS, endpointType),
GRPC_MESSAGE_METRICS_DESC)
);
super(createRegistry(endpointId, endpointType));
this.types = newCounterMaps(Type.class);
}

private static RatisMetricRegistry createRegistry(String endpointId, String endpointType) {
final String name = String.format(GRPC_MESSAGE_METRICS, endpointType);
return create(new MetricRegistryInfo(endpointId,
RATIS_APPLICATION_NAME_METRICS, name, GRPC_MESSAGE_METRICS_DESC));
}

private void inc(String metricNamePrefix, Type t) {
types.get(t)
.computeIfAbsent(metricNamePrefix, prefix -> getRegistry().counter(prefix + t.getSuffix()))
.inc();
}

/**
* Increments the count of RPCs that are started.
* Both client and server use this.
* @param rpcType
*/
public void rpcStarted(String rpcType){
registry.counter(rpcType + "_started_total").inc();
public void rpcStarted(String metricNamePrefix){
inc(metricNamePrefix, Type.STARTED);
}

/**
* Increments the count of RPCs that were started and got completed.
* Both client and server use this.
* @param rpcType
*/
public void rpcCompleted(String rpcType){
registry.counter(rpcType + "_completed_total").inc();
public void rpcCompleted(String metricNamePrefix){
inc(metricNamePrefix, Type.COMPLETED);
}

/**
* increments the count of RPCs recived on the server.
* @param rpcType
* Increments the count of RPCs received on the server.
*/
public void rpcReceived(String rpcType){
registry.counter(rpcType + "_received_executed").inc();
public void rpcReceived(String metricNamePrefix){
inc(metricNamePrefix, Type.RECEIVED);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,13 @@

package org.apache.ratis.metrics;

import java.util.Arrays;
import java.util.Collections;
import java.util.EnumMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -27,8 +33,28 @@ public class RatisMetrics {
static final Logger LOG = LoggerFactory.getLogger(RatisMetrics.class);
public static final String RATIS_APPLICATION_NAME_METRICS = "ratis";

@SuppressWarnings("VisibilityModifier")
protected RatisMetricRegistry registry;
public static String getHeartbeatSuffix(boolean heartbeat) {
return heartbeat ? "_heartbeat" : "";
}

private static <T> Function<Boolean, T> newHeartbeatFunction(String prefix, Function<String, T> function) {
final T trueValue = function.apply(prefix + getHeartbeatSuffix(true));
final T falseValue = function.apply(prefix + getHeartbeatSuffix(false));
return b -> b? trueValue: falseValue;
}

protected static <T extends Enum<T>> Map<T, Map<String, LongCounter>> newCounterMaps(Class<T> clazz) {
final EnumMap<T,Map<String, LongCounter>> maps = new EnumMap<>(clazz);
Arrays.stream(clazz.getEnumConstants()).forEach(t -> maps.put(t, new ConcurrentHashMap<>()));
return Collections.unmodifiableMap(maps);
}

protected static <T extends Enum<T>> Map<T, Timekeeper> newTimerMap(
Class<T> clazz, Function<T, Timekeeper> constructor) {
final EnumMap<T, Timekeeper> map = new EnumMap<>(clazz);
Arrays.stream(clazz.getEnumConstants()).forEach(t -> map.put(t, constructor.apply(t)));
return Collections.unmodifiableMap(map);
}

protected static RatisMetricRegistry create(MetricRegistryInfo info) {
Optional<RatisMetricRegistry> metricRegistry = MetricRegistries.global().get(info);
Expand All @@ -40,6 +66,12 @@ protected static RatisMetricRegistry create(MetricRegistryInfo info) {
});
}

private final RatisMetricRegistry registry;

protected RatisMetrics(RatisMetricRegistry registry) {
this.registry = registry;
}

public void unregister() {
MetricRegistryInfo info = registry.getMetricRegistryInfo();
if (LOG.isDebugEnabled()) {
Expand All @@ -51,11 +83,15 @@ public void unregister() {
}
}

public RatisMetricRegistry getRegistry() {
public final RatisMetricRegistry getRegistry() {
return registry;
}

protected Timekeeper getTimer(String timerName) {
return getRegistry().timer(timerName);
protected Function<Boolean, Timekeeper> newHeartbeatTimer(String prefix) {
return newHeartbeatFunction(prefix, getRegistry()::timer);
}

protected Function<Boolean, LongCounter> newHeartbeatCounter(String prefix) {
return newHeartbeatFunction(prefix, getRegistry()::counter);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.util.function.Consumer;

import org.apache.ratis.metrics.MetricRegistries;
import org.apache.ratis.metrics.MetricRegistryFactory;
import org.apache.ratis.metrics.MetricRegistryInfo;
import org.apache.ratis.metrics.RatisMetricRegistry;
import org.apache.ratis.util.TimeDuration;
Expand All @@ -44,15 +43,15 @@ public class MetricRegistriesImpl extends MetricRegistries {

private final List<Consumer<RatisMetricRegistry>> stopReporters = new CopyOnWriteArrayList<>();

private final MetricRegistryFactory factory;
private final MetricRegistryFactoryImpl factory;

private final RefCountingMap<MetricRegistryInfo, RatisMetricRegistry> registries;

public MetricRegistriesImpl() {
this(new MetricRegistryFactoryImpl());
}

public MetricRegistriesImpl(MetricRegistryFactory factory) {
MetricRegistriesImpl(MetricRegistryFactoryImpl factory) {
this.factory = factory;
this.registries = new RefCountingMap<>();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@

import org.apache.ratis.metrics.MetricRegistryFactory;
import org.apache.ratis.metrics.MetricRegistryInfo;
import org.apache.ratis.metrics.RatisMetricRegistry;

public class MetricRegistryFactoryImpl implements MetricRegistryFactory {
@Override
public RatisMetricRegistry create(MetricRegistryInfo info) {
public RatisMetricRegistryImpl create(MetricRegistryInfo info) {
return new RatisMetricRegistryImpl(info);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@
*/
package org.apache.ratis.netty.metrics;

import org.apache.ratis.metrics.LongCounter;
import org.apache.ratis.metrics.MetricRegistryInfo;
import org.apache.ratis.metrics.RatisMetricRegistry;
import org.apache.ratis.metrics.RatisMetrics;
import org.apache.ratis.metrics.Timekeeper;

import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;

public class NettyServerStreamRpcMetrics extends RatisMetrics {
private static final String METRICS_APP_NAME = "ratis_netty";
Expand Down Expand Up @@ -88,11 +92,32 @@ public void stop(Timekeeper.Context context, boolean success) {
}
}

private enum Op {
Create(RequestType::getNumRequestsString),
Success(RequestType::getSuccessCountString),
Fail(RequestType::getFailCountString);

private final Function<RequestType, String> stringFunction;

Op(Function<RequestType, String> stringFunction) {
this.stringFunction = stringFunction;
}

String getString(RequestType type) {
return stringFunction.apply(type);
}
}

private final Map<String, Timekeeper> latencyTimers = new ConcurrentHashMap<>();
private final Map<Op, Map<String, LongCounter>> ops;

public NettyServerStreamRpcMetrics(String serverId) {
registry = getMetricRegistryForGrpcServer(serverId);
super(createRegistry(serverId));

this.ops = newCounterMaps(Op.class);
}

private RatisMetricRegistry getMetricRegistryForGrpcServer(String serverId) {
private static RatisMetricRegistry createRegistry(String serverId) {
return create(new MetricRegistryInfo(serverId,
METRICS_APP_NAME, METRICS_COMP_NAME, METRICS_DESC));
}
Expand All @@ -102,18 +127,22 @@ public RequestMetrics newRequestMetrics(RequestType type) {
}

public Timekeeper getLatencyTimer(RequestType type) {
return registry.timer(type.getLatencyString());
return latencyTimers.computeIfAbsent(type.getLatencyString(), getRegistry()::timer);
}

private void inc(Op op, RequestType type) {
ops.get(op).computeIfAbsent(op.getString(type), getRegistry()::counter).inc();
}

public void onRequestCreate(RequestType type) {
registry.counter(type.getNumRequestsString()).inc();
inc(Op.Create, type);
}

public void onRequestSuccess(RequestType type) {
registry.counter(type.getSuccessCountString()).inc();
inc(Op.Success, type);
}

public void onRequestFail(RequestType type) {
registry.counter(type.getFailCountString()).inc();
inc(Op.Fail, type);
}
}
Loading