From b6da0508a938841b6107af6b380ad873a46f44df Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Sat, 10 Sep 2022 19:10:55 +0800 Subject: [PATCH 1/2] RATIS-1704. Fix SuppressWarnings("VisibilityModifier") in RatisMetrics. --- .../ratis/grpc/metrics/GrpcServerMetrics.java | 34 +++----- .../ratis/grpc/metrics/MessageMetrics.java | 50 +++++++---- .../apache/ratis/metrics/RatisMetrics.java | 39 +++++++-- .../metrics/impl/MetricRegistriesImpl.java | 5 +- .../impl/MetricRegistryFactoryImpl.java | 3 +- .../metrics/NettyServerStreamRpcMetrics.java | 41 ++++++++-- .../server/impl/StateMachineMetrics.java | 13 +-- .../server/metrics/LeaderElectionMetrics.java | 23 ++++-- .../server/metrics/LogAppenderMetrics.java | 10 +-- .../server/metrics/RaftLogMetricsBase.java | 13 ++- .../server/metrics/RaftServerMetricsImpl.java | 82 ++++++++----------- .../metrics/SegmentedRaftLogMetrics.java | 60 ++++++++------ .../ratis/grpc/TestRaftServerWithGrpc.java | 12 +-- 13 files changed, 232 insertions(+), 153 deletions(-) diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/metrics/GrpcServerMetrics.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/metrics/GrpcServerMetrics.java index 1ffe2ee4a1..785e308d70 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/metrics/GrpcServerMetrics.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/metrics/GrpcServerMetrics.java @@ -51,10 +51,10 @@ public class GrpcServerMetrics extends RatisMetrics { public static final String RATIS_GRPC_METRICS_REQUESTS_COUNT = "num_requests"; public static final String RATIS_GRPC_INSTALL_SNAPSHOT_COUNT = "num_install_snapshot"; - private final LongCounter requestRetry; - private final LongCounter requestInstallSnapshot; + private final LongCounter requestRetry = getRegistry().counter(RATIS_GRPC_METRICS_REQUEST_RETRY_COUNT); + private final LongCounter requestInstallSnapshot = getRegistry().counter(RATIS_GRPC_INSTALL_SNAPSHOT_COUNT); - private final Function requestCreate; + private final Function requestCreate = newHeartbeatCounter(RATIS_GRPC_METRICS_REQUESTS_COUNT); private final Map> requestSuccess = new ConcurrentHashMap<>(); private final Map> requestTimeout = new ConcurrentHashMap<>(); @@ -66,15 +66,10 @@ public class GrpcServerMetrics extends RatisMetrics { private final Map appendLogLatency = new ConcurrentHashMap<>(); public GrpcServerMetrics(String serverId) { - registry = getMetricRegistryForGrpcServer(serverId); - - requestRetry = registry.counter(RATIS_GRPC_METRICS_REQUEST_RETRY_COUNT); - requestInstallSnapshot = registry.counter(RATIS_GRPC_INSTALL_SNAPSHOT_COUNT); - - requestCreate = newHeartbeatCounter(RATIS_GRPC_METRICS_REQUESTS_COUNT); + super(createRegistry(serverId)); } - private RatisMetricRegistry getMetricRegistryForGrpcServer(String serverId) { + private static RatisMetricRegistry createRegistry(String serverId) { return create(new MetricRegistryInfo(serverId, RATIS_GRPC_METRICS_APP_NAME, RATIS_GRPC_METRICS_COMP_NAME, RATIS_GRPC_METRICS_DESC)); @@ -84,7 +79,7 @@ public Timekeeper getGrpcLogAppenderLatencyTimer(String follower, boolean isHear final Map map = isHeartbeat ? heartbeatLatency : appendLogLatency; final String name = map.computeIfAbsent(follower, key -> String.format(RATIS_GRPC_METRICS_LOG_APPENDER_LATENCY + getHeartbeatSuffix(isHeartbeat), key)); - return registry.timer(name); + return getRegistry().timer(name); } public void onRequestRetry() { @@ -105,7 +100,7 @@ public void onRequestSuccess(String follower, boolean isHeartbeat) { } private LongCounter newRequestNotLeader(String follower) { - return registry.counter(String.format(RATIS_GRPC_METRICS_LOG_APPENDER_NOT_LEADER, follower)); + return getRegistry().counter(String.format(RATIS_GRPC_METRICS_LOG_APPENDER_NOT_LEADER, follower)); } public void onRequestNotLeader(String follower) { @@ -113,7 +108,7 @@ public void onRequestNotLeader(String follower) { } private LongCounter newRequestInconsistency(String follower) { - return registry.counter(String.format(RATIS_GRPC_METRICS_LOG_APPENDER_INCONSISTENCY, follower)); + return getRegistry().counter(String.format(RATIS_GRPC_METRICS_LOG_APPENDER_INCONSISTENCY, follower)); } public void onRequestInconsistency(String follower) { @@ -130,20 +125,11 @@ public void onRequestTimeout(String follower, boolean isHeartbeat) { } public void addPendingRequestsCount(String follower, Supplier pendinglogQueueSize) { - registry.gauge(String.format(RATIS_GRPC_METRICS_LOG_APPENDER_PENDING_COUNT, follower), () -> pendinglogQueueSize); + final String name = String.format(RATIS_GRPC_METRICS_LOG_APPENDER_PENDING_COUNT, follower); + getRegistry().gauge(name, () -> pendinglogQueueSize); } public void onInstallSnapshot() { requestInstallSnapshot.inc(); } - - private Function newHeartbeatCounter(String prefix) { - final LongCounter trueCounter = registry.counter(prefix + getHeartbeatSuffix(true)); - final LongCounter falseCounter = registry.counter(prefix + getHeartbeatSuffix(false)); - return b -> b? trueCounter : falseCounter; - } - - public static String getHeartbeatSuffix(boolean heartbeat) { - return heartbeat ? "_heartbeat" : ""; - } } diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/metrics/MessageMetrics.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/metrics/MessageMetrics.java index 4056c7ae7d..2ff53c7b85 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/metrics/MessageMetrics.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/metrics/MessageMetrics.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -17,49 +17,71 @@ */ package org.apache.ratis.grpc.metrics; +import org.apache.ratis.metrics.LongCounter; import org.apache.ratis.metrics.MetricRegistryInfo; import org.apache.ratis.metrics.RatisMetrics; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import java.util.Map; public class MessageMetrics extends RatisMetrics { - static final Logger LOG = LoggerFactory.getLogger(MessageMetrics.class); public static final String GRPC_MESSAGE_METRICS = "%s_message_metrics"; public static final String GRPC_MESSAGE_METRICS_DESC = "Outbound/Inbound message counters"; + private enum Type { + STARTED("_started_total"), + COMPLETED("_completed_total"), + RECEIVED("_received_executed"); + + private final String suffix; + + Type(String suffix) { + this.suffix = suffix; + } + + String getSuffix() { + return suffix; + } + } + + private final Map> types; + public MessageMetrics(String endpointId, String endpointType) { - this.registry = create( + super(create( new MetricRegistryInfo(endpointId, RATIS_APPLICATION_NAME_METRICS, String.format(GRPC_MESSAGE_METRICS, endpointType), GRPC_MESSAGE_METRICS_DESC) - ); + )); + + this.types = newCounterMaps(Type.class); + } + + private void inc(Type t, String rpcType) { + types.get(t) + .computeIfAbsent(rpcType, key -> getRegistry().counter(key + t.getSuffix())) + .inc(); } /** * Increments the count of RPCs that are started. * Both client and server use this. - * @param rpcType */ public void rpcStarted(String rpcType){ - registry.counter(rpcType + "_started_total").inc(); + inc(Type.STARTED, rpcType); } /** * Increments the count of RPCs that were started and got completed. * Both client and server use this. - * @param rpcType */ public void rpcCompleted(String rpcType){ - registry.counter(rpcType + "_completed_total").inc(); + inc(Type.COMPLETED, rpcType); } /** - * increments the count of RPCs recived on the server. - * @param rpcType + * Increments the count of RPCs received on the server. */ public void rpcReceived(String rpcType){ - registry.counter(rpcType + "_received_executed").inc(); + inc(Type.RECEIVED, rpcType); } - } diff --git a/ratis-metrics/src/main/java/org/apache/ratis/metrics/RatisMetrics.java b/ratis-metrics/src/main/java/org/apache/ratis/metrics/RatisMetrics.java index 2793797a11..a6ae90d779 100644 --- a/ratis-metrics/src/main/java/org/apache/ratis/metrics/RatisMetrics.java +++ b/ratis-metrics/src/main/java/org/apache/ratis/metrics/RatisMetrics.java @@ -18,7 +18,13 @@ package org.apache.ratis.metrics; +import java.util.Arrays; +import java.util.Collections; +import java.util.EnumMap; +import java.util.Map; import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,8 +33,21 @@ public class RatisMetrics { static final Logger LOG = LoggerFactory.getLogger(RatisMetrics.class); public static final String RATIS_APPLICATION_NAME_METRICS = "ratis"; - @SuppressWarnings("VisibilityModifier") - protected RatisMetricRegistry registry; + public static String getHeartbeatSuffix(boolean heartbeat) { + return heartbeat ? "_heartbeat" : ""; + } + + private static Function newHeartbeatFunction(String prefix, Function function) { + final T trueValue = function.apply(prefix + getHeartbeatSuffix(true)); + final T falseValue = function.apply(prefix + getHeartbeatSuffix(false)); + return b -> b? trueValue: falseValue; + } + + protected static > Map> newCounterMaps(Class clazz) { + final EnumMap> maps = new EnumMap<>(clazz); + Arrays.stream(clazz.getEnumConstants()).forEach(t -> maps.put(t, new ConcurrentHashMap<>())); + return Collections.unmodifiableMap(maps); + } protected static RatisMetricRegistry create(MetricRegistryInfo info) { Optional metricRegistry = MetricRegistries.global().get(info); @@ -40,6 +59,12 @@ protected static RatisMetricRegistry create(MetricRegistryInfo info) { }); } + private final RatisMetricRegistry registry; + + protected RatisMetrics(RatisMetricRegistry registry) { + this.registry = registry; + } + public void unregister() { MetricRegistryInfo info = registry.getMetricRegistryInfo(); if (LOG.isDebugEnabled()) { @@ -51,11 +76,15 @@ public void unregister() { } } - public RatisMetricRegistry getRegistry() { + public final RatisMetricRegistry getRegistry() { return registry; } - protected Timekeeper getTimer(String timerName) { - return getRegistry().timer(timerName); + protected Function newHeartbeatTimer(String prefix) { + return newHeartbeatFunction(prefix, getRegistry()::timer); + } + + protected Function newHeartbeatCounter(String prefix) { + return newHeartbeatFunction(prefix, getRegistry()::counter); } } diff --git a/ratis-metrics/src/main/java/org/apache/ratis/metrics/impl/MetricRegistriesImpl.java b/ratis-metrics/src/main/java/org/apache/ratis/metrics/impl/MetricRegistriesImpl.java index f4ec686fdf..e4822a968c 100644 --- a/ratis-metrics/src/main/java/org/apache/ratis/metrics/impl/MetricRegistriesImpl.java +++ b/ratis-metrics/src/main/java/org/apache/ratis/metrics/impl/MetricRegistriesImpl.java @@ -26,7 +26,6 @@ import java.util.function.Consumer; import org.apache.ratis.metrics.MetricRegistries; -import org.apache.ratis.metrics.MetricRegistryFactory; import org.apache.ratis.metrics.MetricRegistryInfo; import org.apache.ratis.metrics.RatisMetricRegistry; import org.apache.ratis.util.TimeDuration; @@ -44,7 +43,7 @@ public class MetricRegistriesImpl extends MetricRegistries { private final List> stopReporters = new CopyOnWriteArrayList<>(); - private final MetricRegistryFactory factory; + private final MetricRegistryFactoryImpl factory; private final RefCountingMap registries; @@ -52,7 +51,7 @@ public MetricRegistriesImpl() { this(new MetricRegistryFactoryImpl()); } - public MetricRegistriesImpl(MetricRegistryFactory factory) { + MetricRegistriesImpl(MetricRegistryFactoryImpl factory) { this.factory = factory; this.registries = new RefCountingMap<>(); } diff --git a/ratis-metrics/src/main/java/org/apache/ratis/metrics/impl/MetricRegistryFactoryImpl.java b/ratis-metrics/src/main/java/org/apache/ratis/metrics/impl/MetricRegistryFactoryImpl.java index ae33e62566..124467d58d 100644 --- a/ratis-metrics/src/main/java/org/apache/ratis/metrics/impl/MetricRegistryFactoryImpl.java +++ b/ratis-metrics/src/main/java/org/apache/ratis/metrics/impl/MetricRegistryFactoryImpl.java @@ -19,11 +19,10 @@ import org.apache.ratis.metrics.MetricRegistryFactory; import org.apache.ratis.metrics.MetricRegistryInfo; -import org.apache.ratis.metrics.RatisMetricRegistry; public class MetricRegistryFactoryImpl implements MetricRegistryFactory { @Override - public RatisMetricRegistry create(MetricRegistryInfo info) { + public RatisMetricRegistryImpl create(MetricRegistryInfo info) { return new RatisMetricRegistryImpl(info); } } diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/metrics/NettyServerStreamRpcMetrics.java b/ratis-netty/src/main/java/org/apache/ratis/netty/metrics/NettyServerStreamRpcMetrics.java index a613acb9e4..cdcc245ef4 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/metrics/NettyServerStreamRpcMetrics.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/metrics/NettyServerStreamRpcMetrics.java @@ -17,12 +17,16 @@ */ package org.apache.ratis.netty.metrics; +import org.apache.ratis.metrics.LongCounter; import org.apache.ratis.metrics.MetricRegistryInfo; import org.apache.ratis.metrics.RatisMetricRegistry; import org.apache.ratis.metrics.RatisMetrics; import org.apache.ratis.metrics.Timekeeper; import java.util.Locale; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; public class NettyServerStreamRpcMetrics extends RatisMetrics { private static final String METRICS_APP_NAME = "ratis_netty"; @@ -88,11 +92,32 @@ public void stop(Timekeeper.Context context, boolean success) { } } + private enum Op { + Create(RequestType::getNumRequestsString), + Success(RequestType::getSuccessCountString), + Fail(RequestType::getFailCountString); + + private final Function stringFunction; + + Op(Function stringFunction) { + this.stringFunction = stringFunction; + } + + String getString(RequestType type) { + return stringFunction.apply(type); + } + } + + private final Map latencyTimers = new ConcurrentHashMap<>(); + private final Map> ops; + public NettyServerStreamRpcMetrics(String serverId) { - registry = getMetricRegistryForGrpcServer(serverId); + super(getMetricRegistryForGrpcServer(serverId)); + + this.ops = newCounterMaps(Op.class); } - private RatisMetricRegistry getMetricRegistryForGrpcServer(String serverId) { + private static RatisMetricRegistry getMetricRegistryForGrpcServer(String serverId) { return create(new MetricRegistryInfo(serverId, METRICS_APP_NAME, METRICS_COMP_NAME, METRICS_DESC)); } @@ -102,18 +127,22 @@ public RequestMetrics newRequestMetrics(RequestType type) { } public Timekeeper getLatencyTimer(RequestType type) { - return registry.timer(type.getLatencyString()); + return latencyTimers.computeIfAbsent(type.getLatencyString(), getRegistry()::timer); + } + + private void inc(Op op, RequestType type) { + ops.get(op).computeIfAbsent(op.getString(type), getRegistry()::counter).inc(); } public void onRequestCreate(RequestType type) { - registry.counter(type.getNumRequestsString()).inc(); + inc(Op.Create, type); } public void onRequestSuccess(RequestType type) { - registry.counter(type.getSuccessCountString()).inc(); + inc(Op.Success, type); } public void onRequestFail(RequestType type) { - registry.counter(type.getFailCountString()).inc(); + inc(Op.Fail, type); } } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineMetrics.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineMetrics.java index 690faad47c..2b316977d9 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineMetrics.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineMetrics.java @@ -52,21 +52,24 @@ public static StateMachineMetrics getStateMachineMetrics( return new StateMachineMetrics(serverId, getApplied, getApplyCompleted); } + private final Timekeeper takeSnapshotTimer = getRegistry().timer(STATEMACHINE_TAKE_SNAPSHOT_TIMER); + private StateMachineMetrics(String serverId, LongSupplier getApplied, LongSupplier getApplyCompleted) { - registry = getMetricRegistryForStateMachine(serverId); - registry.gauge(STATEMACHINE_APPLIED_INDEX_GAUGE, () -> getApplied::getAsLong); - registry.gauge(STATEMACHINE_APPLY_COMPLETED_GAUGE, () -> getApplyCompleted::getAsLong); + super(createRegistry(serverId)); + + getRegistry().gauge(STATEMACHINE_APPLIED_INDEX_GAUGE, () -> getApplied::getAsLong); + getRegistry().gauge(STATEMACHINE_APPLY_COMPLETED_GAUGE, () -> getApplyCompleted::getAsLong); } - private RatisMetricRegistry getMetricRegistryForStateMachine(String serverId) { + private static RatisMetricRegistry createRegistry(String serverId) { return create(new MetricRegistryInfo(serverId, RATIS_APPLICATION_NAME_METRICS, RATIS_STATEMACHINE_METRICS, RATIS_STATEMACHINE_METRICS_DESC)); } public Timekeeper getTakeSnapshotTimer() { - return registry.timer(STATEMACHINE_TAKE_SNAPSHOT_TIMER); + return takeSnapshotTimer; } } \ No newline at end of file diff --git a/ratis-server/src/main/java/org/apache/ratis/server/metrics/LeaderElectionMetrics.java b/ratis-server/src/main/java/org/apache/ratis/server/metrics/LeaderElectionMetrics.java index 77083106f7..9216aaeb35 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/metrics/LeaderElectionMetrics.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/metrics/LeaderElectionMetrics.java @@ -18,6 +18,7 @@ package org.apache.ratis.server.metrics; +import org.apache.ratis.metrics.LongCounter; import org.apache.ratis.metrics.MetricRegistryInfo; import org.apache.ratis.metrics.RatisMetricRegistry; import org.apache.ratis.metrics.RatisMetrics; @@ -43,12 +44,20 @@ public final class LeaderElectionMetrics extends RatisMetrics { public static final String TRANSFER_LEADERSHIP_COUNT_METRIC = "transferLeadershipCount"; public static final String LAST_LEADER_ELECTION_ELAPSED_TIME = "lastLeaderElectionElapsedTime"; + + private final LongCounter electionCount = getRegistry().counter(LEADER_ELECTION_COUNT_METRIC); + private final LongCounter timeoutCount = getRegistry().counter(LEADER_ELECTION_TIMEOUT_COUNT_METRIC); + private final LongCounter transferLeadershipCount = getRegistry().counter(TRANSFER_LEADERSHIP_COUNT_METRIC); + + private final Timekeeper electionTime = getRegistry().timer(LEADER_ELECTION_TIME_TAKEN); + private volatile Timestamp lastElectionTime; private LeaderElectionMetrics(RaftGroupMemberId serverId, LongSupplier getLastLeaderElapsedTimeMs) { - this.registry = getMetricRegistryForLeaderElection(serverId); - registry.gauge(LAST_LEADER_ELAPSED_TIME, () -> getLastLeaderElapsedTimeMs::getAsLong); - registry.gauge(LAST_LEADER_ELECTION_ELAPSED_TIME, + super(getMetricRegistryForLeaderElection(serverId)); + + getRegistry().gauge(LAST_LEADER_ELAPSED_TIME, () -> getLastLeaderElapsedTimeMs::getAsLong); + getRegistry().gauge(LAST_LEADER_ELECTION_ELAPSED_TIME, () -> () -> Optional.ofNullable(lastElectionTime).map(Timestamp::elapsedTimeMs).orElse(-1L)); } @@ -64,19 +73,19 @@ public static LeaderElectionMetrics getLeaderElectionMetrics( } public void onNewLeaderElectionCompletion() { - registry.counter(LEADER_ELECTION_COUNT_METRIC).inc(); + electionCount.inc(); lastElectionTime = Timestamp.currentTime(); } public Timekeeper getLeaderElectionTimer() { - return registry.timer(LEADER_ELECTION_TIME_TAKEN); + return electionTime; } public void onLeaderElectionTimeout() { - registry.counter(LEADER_ELECTION_TIMEOUT_COUNT_METRIC).inc(); + timeoutCount.inc(); } public void onTransferLeadership() { - registry.counter(TRANSFER_LEADERSHIP_COUNT_METRIC).inc(); + transferLeadershipCount.inc(); } } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/metrics/LogAppenderMetrics.java b/ratis-server/src/main/java/org/apache/ratis/server/metrics/LogAppenderMetrics.java index a8f03ecff9..f38d1b1c96 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/metrics/LogAppenderMetrics.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/metrics/LogAppenderMetrics.java @@ -36,10 +36,10 @@ public final class LogAppenderMetrics extends RatisMetrics { public static final String FOLLOWER_RPC_RESP_TIME = "follower_%s_rpc_response_time"; public LogAppenderMetrics(RaftGroupMemberId groupMemberId) { - registry = getMetricRegistryForLogAppender(groupMemberId.toString()); + super(createRegistry(groupMemberId.toString())); } - private RatisMetricRegistry getMetricRegistryForLogAppender(String serverId) { + private static RatisMetricRegistry createRegistry(String serverId) { return create(new MetricRegistryInfo(serverId, RATIS_APPLICATION_NAME_METRICS, RATIS_LOG_APPENDER_METRICS, RATIS_LOG_APPENDER_METRICS_DESC)); @@ -47,8 +47,8 @@ private RatisMetricRegistry getMetricRegistryForLogAppender(String serverId) { public void addFollowerGauges(RaftPeerId id, LongSupplier getNextIndex, LongSupplier getMatchIndex, Supplier getLastRpcTime) { - registry.gauge(String.format(FOLLOWER_NEXT_INDEX, id), () -> getNextIndex::getAsLong); - registry.gauge(String.format(FOLLOWER_MATCH_INDEX, id), () -> getMatchIndex::getAsLong); - registry.gauge(String.format(FOLLOWER_RPC_RESP_TIME, id), () -> () -> getLastRpcTime.get().elapsedTimeMs()); + getRegistry().gauge(String.format(FOLLOWER_NEXT_INDEX, id), () -> getNextIndex::getAsLong); + getRegistry().gauge(String.format(FOLLOWER_MATCH_INDEX, id), () -> getMatchIndex::getAsLong); + getRegistry().gauge(String.format(FOLLOWER_RPC_RESP_TIME, id), () -> () -> getLastRpcTime.get().elapsedTimeMs()); } } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftLogMetricsBase.java b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftLogMetricsBase.java index 7c1eb09cbd..15b00ca9d7 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftLogMetricsBase.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftLogMetricsBase.java @@ -18,6 +18,7 @@ package org.apache.ratis.server.metrics; +import org.apache.ratis.metrics.LongCounter; import org.apache.ratis.metrics.MetricRegistryInfo; import org.apache.ratis.metrics.RatisMetricRegistry; import org.apache.ratis.metrics.RatisMetrics; @@ -33,8 +34,12 @@ public class RaftLogMetricsBase extends RatisMetrics implements RaftLogMetrics { public static final String CONFIG_LOG_ENTRY_COUNT = "configLogEntryCount"; public static final String STATE_MACHINE_LOG_ENTRY_COUNT = "stateMachineLogEntryCount"; + private final LongCounter configLogEntryCount = getRegistry().counter(CONFIG_LOG_ENTRY_COUNT); + private final LongCounter metadataLogEntryCount = getRegistry().counter(METADATA_LOG_ENTRY_COUNT); + private final LongCounter stateMachineLogEntryCount = getRegistry().counter(STATE_MACHINE_LOG_ENTRY_COUNT); + public RaftLogMetricsBase(RaftGroupMemberId serverId) { - this.registry = getLogWorkerMetricRegistry(serverId); + super(getLogWorkerMetricRegistry(serverId)); } public static RatisMetricRegistry getLogWorkerMetricRegistry(RaftGroupMemberId serverId) { @@ -47,13 +52,13 @@ public static RatisMetricRegistry getLogWorkerMetricRegistry(RaftGroupMemberId s public void onLogEntryCommitted(LogEntryHeader header) { switch (header.getLogEntryBodyCase()) { case CONFIGURATIONENTRY: - registry.counter(CONFIG_LOG_ENTRY_COUNT).inc(); + configLogEntryCount.inc(); return; case METADATAENTRY: - registry.counter(METADATA_LOG_ENTRY_COUNT).inc(); + metadataLogEntryCount.inc(); return; case STATEMACHINELOGENTRY: - registry.counter(STATE_MACHINE_LOG_ENTRY_COUNT).inc(); + stateMachineLogEntryCount.inc(); return; default: } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftServerMetricsImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftServerMetricsImpl.java index 45137cdf37..f219cb2115 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftServerMetricsImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftServerMetricsImpl.java @@ -74,17 +74,22 @@ public final class RaftServerMetricsImpl extends RatisMetrics implements RaftSer public static final String RATIS_SERVER_FAILED_CLIENT_STREAM_COUNT = "numFailedClientStreamOnServer"; public static final String RATIS_SERVER_INSTALL_SNAPSHOT_COUNT = "numInstallSnapshot"; - private final LongCounter numRequestQueueLimitHits; - private final LongCounter numRequestsByteSizeLimitHits; - private final LongCounter numResourceLimitHits; + private final LongCounter numRequestQueueLimitHits = getRegistry().counter(REQUEST_QUEUE_LIMIT_HIT_COUNTER); + private final LongCounter numRequestsByteSizeLimitHits = getRegistry().counter(REQUEST_BYTE_SIZE_LIMIT_HIT_COUNTER); + private final LongCounter numResourceLimitHits = getRegistry().counter(RESOURCE_LIMIT_HIT_COUNTER); - private final LongCounter numFailedClientStaleRead; - private final LongCounter numFailedClientRead; - private final LongCounter numFailedClientWrite; - private final LongCounter numFailedClientWatch; - private final LongCounter numFailedClientStream; + private final LongCounter numFailedClientStaleRead + = getRegistry().counter(RATIS_SERVER_FAILED_CLIENT_STALE_READ_COUNT); + private final LongCounter numFailedClientRead = getRegistry().counter(RATIS_SERVER_FAILED_CLIENT_READ_COUNT); + private final LongCounter numFailedClientWrite = getRegistry().counter(RATIS_SERVER_FAILED_CLIENT_WRITE_COUNT); + private final LongCounter numFailedClientWatch = getRegistry().counter(RATIS_SERVER_FAILED_CLIENT_WATCH_COUNT); + private final LongCounter numFailedClientStream = getRegistry().counter(RATIS_SERVER_FAILED_CLIENT_STREAM_COUNT); - private final LongCounter numInstallSnapshot; + + private final LongCounter numInstallSnapshot = getRegistry().counter(RATIS_SERVER_INSTALL_SNAPSHOT_COUNT); + + private final Function followerAppendEntryLatency + = newHeartbeatTimer(FOLLOWER_APPEND_ENTRIES_LATENCY); /** Follower Id -> heartbeat elapsed */ private final Map followerLastHeartbeatElapsedTimeMap = new HashMap<>(); @@ -114,21 +119,9 @@ public static void removeRaftServerMetrics(RaftGroupMemberId serverId) { public RaftServerMetricsImpl(RaftGroupMemberId serverId, Supplier> commitInfoCache, Supplier retryCacheStatistics) { - this.registry = getMetricRegistryForRaftServer(serverId.toString()); + super(createRegistry(serverId.toString())); this.commitInfoCache = commitInfoCache; - numRequestQueueLimitHits = registry.counter(REQUEST_QUEUE_LIMIT_HIT_COUNTER); - numRequestsByteSizeLimitHits = registry.counter(REQUEST_BYTE_SIZE_LIMIT_HIT_COUNTER); - numResourceLimitHits = registry.counter(RESOURCE_LIMIT_HIT_COUNTER); - - numFailedClientStaleRead = registry.counter(RATIS_SERVER_FAILED_CLIENT_STALE_READ_COUNT); - numFailedClientRead = registry.counter(RATIS_SERVER_FAILED_CLIENT_READ_COUNT); - numFailedClientWrite = registry.counter(RATIS_SERVER_FAILED_CLIENT_WRITE_COUNT); - numFailedClientWatch = registry.counter(RATIS_SERVER_FAILED_CLIENT_WATCH_COUNT); - numFailedClientStream = registry.counter(RATIS_SERVER_FAILED_CLIENT_STREAM_COUNT); - - numInstallSnapshot = registry.counter(RATIS_SERVER_INSTALL_SNAPSHOT_COUNT); - addPeerCommitIndexGauge(serverId.getPeerId()); addRetryCacheMetric(retryCacheStatistics); } @@ -153,18 +146,18 @@ public LongCounter getNumInstallSnapshot() { return numInstallSnapshot; } - private RatisMetricRegistry getMetricRegistryForRaftServer(String serverId) { + private static RatisMetricRegistry createRegistry(String serverId) { return create(new MetricRegistryInfo(serverId, RATIS_APPLICATION_NAME_METRICS, RATIS_SERVER_METRICS, RATIS_SERVER_METRICS_DESC)); } private void addRetryCacheMetric(Supplier retryCacheStatistics) { - registry.gauge(RETRY_CACHE_ENTRY_COUNT_METRIC, () -> () -> retryCacheStatistics.get().size()); - registry.gauge(RETRY_CACHE_HIT_COUNT_METRIC , () -> () -> retryCacheStatistics.get().hitCount()); - registry.gauge(RETRY_CACHE_HIT_RATE_METRIC , () -> () -> retryCacheStatistics.get().hitRate()); - registry.gauge(RETRY_CACHE_MISS_COUNT_METRIC , () -> () -> retryCacheStatistics.get().missCount()); - registry.gauge(RETRY_CACHE_MISS_RATE_METRIC , () -> () -> retryCacheStatistics.get().missRate()); + getRegistry().gauge(RETRY_CACHE_ENTRY_COUNT_METRIC, () -> () -> retryCacheStatistics.get().size()); + getRegistry().gauge(RETRY_CACHE_HIT_COUNT_METRIC , () -> () -> retryCacheStatistics.get().hitCount()); + getRegistry().gauge(RETRY_CACHE_HIT_RATE_METRIC , () -> () -> retryCacheStatistics.get().hitRate()); + getRegistry().gauge(RETRY_CACHE_MISS_COUNT_METRIC , () -> () -> retryCacheStatistics.get().missCount()); + getRegistry().gauge(RETRY_CACHE_MISS_RATE_METRIC , () -> () -> retryCacheStatistics.get().missRate()); } /** @@ -176,8 +169,7 @@ public void addFollower(RaftPeerId followerName) { followerName); followerLastHeartbeatElapsedTimeMap.put(followerName, 0L); - registry.gauge(followerHbMetricKey, - () -> () -> followerLastHeartbeatElapsedTimeMap.get(followerName)); + getRegistry().gauge(followerHbMetricKey, () -> () -> followerLastHeartbeatElapsedTimeMap.get(followerName)); addPeerCommitIndexGauge(followerName); } @@ -185,8 +177,8 @@ public void addFollower(RaftPeerId followerName) { /** * Register a commit index tracker for the peer in cluster. */ - public void addPeerCommitIndexGauge(RaftPeerId peerId) { - registry.gauge(getPeerCommitIndexGaugeKey(peerId), () -> () -> Optional.ofNullable(commitInfoCache.get()) + private void addPeerCommitIndexGauge(RaftPeerId peerId) { + getRegistry().gauge(getPeerCommitIndexGaugeKey(peerId), () -> () -> Optional.ofNullable(commitInfoCache.get()) .map(cache -> cache.apply(peerId)) .map(CommitInfoProto::getCommitIndex) .orElse(0L)); @@ -207,23 +199,19 @@ public void recordFollowerHeartbeatElapsedTime(RaftPeerId followerId, long elaps } public Timekeeper getFollowerAppendEntryTimer(boolean isHeartbeat) { - return registry.timer(FOLLOWER_APPEND_ENTRIES_LATENCY + (isHeartbeat ? "_heartbeat" : "")); - } - - public Timekeeper getTimer(String timerName) { - return registry.timer(timerName); + return followerAppendEntryLatency.apply(isHeartbeat); } public Timekeeper getClientRequestTimer(Type request) { if (request.is(TypeCase.READ)) { - return getTimer(RAFT_CLIENT_READ_REQUEST); + return getRegistry().timer(RAFT_CLIENT_READ_REQUEST); } else if (request.is(TypeCase.STALEREAD)) { - return getTimer(RAFT_CLIENT_STALE_READ_REQUEST); + return getRegistry().timer(RAFT_CLIENT_STALE_READ_REQUEST); } else if (request.is(TypeCase.WATCH)) { String watchType = Type.toString(request.getWatch().getReplication()); - return getTimer(String.format(RAFT_CLIENT_WATCH_REQUEST, watchType)); + return getRegistry().timer(String.format(RAFT_CLIENT_WATCH_REQUEST, watchType)); } else if (request.is(TypeCase.WRITE)) { - return getTimer(RAFT_CLIENT_WRITE_REQUEST); + return getRegistry().timer(RAFT_CLIENT_WRITE_REQUEST); } return null; } @@ -233,19 +221,19 @@ public void onRequestQueueLimitHit() { } public void addNumPendingRequestsGauge(Supplier queueSize) { - registry.gauge(REQUEST_QUEUE_SIZE, () -> queueSize); + getRegistry().gauge(REQUEST_QUEUE_SIZE, () -> queueSize); } public boolean removeNumPendingRequestsGauge() { - return registry.remove(REQUEST_QUEUE_SIZE); + return getRegistry().remove(REQUEST_QUEUE_SIZE); } public void addNumPendingRequestsMegaByteSize(Supplier megabyteSize) { - registry.gauge(REQUEST_MEGA_BYTE_SIZE, () -> megabyteSize); + getRegistry().gauge(REQUEST_MEGA_BYTE_SIZE, () -> megabyteSize); } public boolean removeNumPendingRequestsByteSize() { - return registry.remove(REQUEST_MEGA_BYTE_SIZE); + return getRegistry().remove(REQUEST_MEGA_BYTE_SIZE); } public void onRequestByteSizeLimitHit() { @@ -294,8 +282,4 @@ public void incFailedRequestCount(Type type) { public void onSnapshotInstalled() { numInstallSnapshot.inc(); } - - public RatisMetricRegistry getRegistry() { - return registry; - } } \ No newline at end of file diff --git a/ratis-server/src/main/java/org/apache/ratis/server/metrics/SegmentedRaftLogMetrics.java b/ratis-server/src/main/java/org/apache/ratis/server/metrics/SegmentedRaftLogMetrics.java index 4e25f20cb9..a6a1af0acb 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/metrics/SegmentedRaftLogMetrics.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/metrics/SegmentedRaftLogMetrics.java @@ -18,6 +18,7 @@ package org.apache.ratis.server.metrics; +import org.apache.ratis.metrics.LongCounter; import org.apache.ratis.metrics.Timekeeper; import org.apache.ratis.protocol.RaftGroupMemberId; import org.apache.ratis.util.JavaUtils; @@ -80,42 +81,54 @@ public class SegmentedRaftLogMetrics extends RaftLogMetricsBase { /** Time required to load and process raft log segments during restart */ public static final String RAFT_LOG_LOAD_SEGMENT_LATENCY = "segmentLoadLatency"; - private final Timekeeper flushTimer = getTimer(RAFT_LOG_FLUSH_TIME); - private final Timekeeper syncTimer = getTimer(RAFT_LOG_SYNC_TIME); - private final Timekeeper enqueuedTimer = getTimer(RAFT_LOG_TASK_QUEUE_TIME); - private final Timekeeper queuingDelayTimer = getTimer(RAFT_LOG_TASK_ENQUEUE_DELAY);; + private final Timekeeper flushTimer = getRegistry().timer(RAFT_LOG_FLUSH_TIME); + private final Timekeeper syncTimer = getRegistry().timer(RAFT_LOG_SYNC_TIME); + private final Timekeeper enqueuedTimer = getRegistry().timer(RAFT_LOG_TASK_QUEUE_TIME); + private final Timekeeper queuingDelayTimer = getRegistry().timer(RAFT_LOG_TASK_ENQUEUE_DELAY); - private final Timekeeper appendEntryTimer = getTimer(RAFT_LOG_APPEND_ENTRY_LATENCY); - private final Timekeeper readEntryTimer = getTimer(RAFT_LOG_READ_ENTRY_LATENCY); - private final Timekeeper loadSegmentTimer = getTimer(RAFT_LOG_LOAD_SEGMENT_LATENCY); - private final Timekeeper purgeTimer = getTimer(RAFT_LOG_PURGE_METRIC); + private final Timekeeper appendEntryTimer = getRegistry().timer(RAFT_LOG_APPEND_ENTRY_LATENCY); + private final Timekeeper readEntryTimer = getRegistry().timer(RAFT_LOG_READ_ENTRY_LATENCY); + private final Timekeeper loadSegmentTimer = getRegistry().timer(RAFT_LOG_LOAD_SEGMENT_LATENCY); + private final Timekeeper purgeTimer = getRegistry().timer(RAFT_LOG_PURGE_METRIC); + + private final LongCounter cacheHitCount = getRegistry().counter(RAFT_LOG_CACHE_HIT_COUNT); + private final LongCounter cacheMissCount= getRegistry().counter(RAFT_LOG_CACHE_MISS_COUNT); + private final LongCounter appendEntryCount = getRegistry().counter(RAFT_LOG_APPEND_ENTRY_COUNT); + private final LongCounter flushCount = getRegistry().counter(RAFT_LOG_FLUSH_COUNT); + + private final LongCounter numStateMachineDataWriteTimeout = getRegistry().counter( + RAFT_LOG_STATEMACHINE_DATA_WRITE_TIMEOUT_COUNT); + private final LongCounter numStateMachineDataReadTimeout = getRegistry().counter( + RAFT_LOG_STATEMACHINE_DATA_READ_TIMEOUT_COUNT); + + private final Map, Timekeeper> taskClassTimers = new ConcurrentHashMap<>(); public SegmentedRaftLogMetrics(RaftGroupMemberId serverId) { super(serverId); } public void addDataQueueSizeGauge(Supplier numElements) { - registry.gauge(RAFT_LOG_DATA_QUEUE_SIZE, () -> numElements); + getRegistry().gauge(RAFT_LOG_DATA_QUEUE_SIZE, () -> numElements); } public void addClosedSegmentsNum(Supplier cachedSegmentNum) { - registry.gauge(RAFT_LOG_CACHE_CLOSED_SEGMENTS_NUM, () -> cachedSegmentNum); + getRegistry().gauge(RAFT_LOG_CACHE_CLOSED_SEGMENTS_NUM, () -> cachedSegmentNum); } public void addClosedSegmentsSizeInBytes(Supplier closedSegmentsSizeInBytes) { - registry.gauge(RAFT_LOG_CACHE_CLOSED_SEGMENTS_SIZE_IN_BYTES, () -> closedSegmentsSizeInBytes); + getRegistry().gauge(RAFT_LOG_CACHE_CLOSED_SEGMENTS_SIZE_IN_BYTES, () -> closedSegmentsSizeInBytes); } public void addOpenSegmentSizeInBytes(Supplier openSegmentSizeInBytes) { - registry.gauge(RAFT_LOG_CACHE_OPEN_SEGMENT_SIZE_IN_BYTES, () -> openSegmentSizeInBytes); + getRegistry().gauge(RAFT_LOG_CACHE_OPEN_SEGMENT_SIZE_IN_BYTES, () -> openSegmentSizeInBytes); } public void addLogWorkerQueueSizeGauge(Supplier queueSize) { - registry.gauge(RAFT_LOG_WORKER_QUEUE_SIZE, () -> queueSize); + getRegistry().gauge(RAFT_LOG_WORKER_QUEUE_SIZE, () -> queueSize); } public void addFlushBatchSizeGauge(Supplier flushBatchSize) { - registry.gauge(RAFT_LOG_SYNC_BATCH_SIZE, () -> flushBatchSize); + getRegistry().gauge(RAFT_LOG_SYNC_BATCH_SIZE, () -> flushBatchSize); } public UncheckedAutoCloseable startFlushTimer() { @@ -127,19 +140,19 @@ public Timekeeper getSyncTimer() { } public void onRaftLogCacheHit() { - registry.counter(RAFT_LOG_CACHE_HIT_COUNT).inc(); + cacheHitCount.inc(); } public void onRaftLogCacheMiss() { - registry.counter(RAFT_LOG_CACHE_MISS_COUNT).inc(); + cacheMissCount.inc(); } public void onRaftLogFlush() { - registry.counter(RAFT_LOG_FLUSH_COUNT).inc(); + flushCount.inc(); } public void onRaftLogAppendEntry() { - registry.counter(RAFT_LOG_APPEND_ENTRY_COUNT).inc(); + appendEntryCount.inc(); } public UncheckedAutoCloseable startAppendEntryTimer() { @@ -154,13 +167,12 @@ public UncheckedAutoCloseable startQueuingDelayTimer() { return Timekeeper.start(queuingDelayTimer); } - private final Map, Timekeeper> classMap = new ConcurrentHashMap<>(); - private Timekeeper getTaskExecutionTimer(Class taskClass) { - return getTimer(String.format(RAFT_LOG_TASK_EXECUTION_TIME, + private Timekeeper newTaskExecutionTimer(Class taskClass) { + return getRegistry().timer(String.format(RAFT_LOG_TASK_EXECUTION_TIME, JavaUtils.getClassSimpleName(taskClass).toLowerCase())); } public UncheckedAutoCloseable startTaskExecutionTimer(Class taskClass) { - return Timekeeper.start(classMap.computeIfAbsent(taskClass, this::getTaskExecutionTimer)); + return Timekeeper.start(taskClassTimers.computeIfAbsent(taskClass, this::newTaskExecutionTimer)); } public Timekeeper getReadEntryTimer() { @@ -176,11 +188,11 @@ public UncheckedAutoCloseable startPurgeTimer() { } public void onStateMachineDataWriteTimeout() { - registry.counter(RAFT_LOG_STATEMACHINE_DATA_WRITE_TIMEOUT_COUNT).inc(); + numStateMachineDataWriteTimeout.inc(); } @Override public void onStateMachineDataReadTimeout() { - registry.counter(RAFT_LOG_STATEMACHINE_DATA_READ_TIMEOUT_COUNT).inc(); + numStateMachineDataReadTimeout.inc(); } } diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java index d4dce4b2fc..dd36b00325 100644 --- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java @@ -22,6 +22,7 @@ import static org.apache.ratis.server.metrics.RaftServerMetricsImpl.RAFT_CLIENT_WATCH_REQUEST; import static org.apache.ratis.server.metrics.RaftServerMetricsImpl.RAFT_CLIENT_WRITE_REQUEST; +import org.apache.ratis.metrics.RatisMetricRegistry; import org.apache.ratis.server.storage.RaftStorage; import org.apache.log4j.Level; import org.apache.ratis.BaseTest; @@ -302,33 +303,34 @@ void testRaftClientRequestMetrics(MiniRaftClusterWithGrpc cluster) throws IOExce ExecutionException, InterruptedException { final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster); final RaftServerMetricsImpl raftServerMetrics = getRaftServerMetrics(leader); + final RatisMetricRegistry registry = raftServerMetrics.getRegistry(); try (final RaftClient client = cluster.createClient()) { final CompletableFuture f1 = client.async().send(new SimpleMessage("testing")); Assert.assertTrue(f1.get().isSuccess()); - final DefaultTimekeeperImpl write = (DefaultTimekeeperImpl) raftServerMetrics.getTimer(RAFT_CLIENT_WRITE_REQUEST); + final DefaultTimekeeperImpl write = (DefaultTimekeeperImpl) registry.timer(RAFT_CLIENT_WRITE_REQUEST); Assert.assertTrue(write.getTimer().getCount() > 0); final CompletableFuture f2 = client.async().sendReadOnly(new SimpleMessage("testing")); Assert.assertTrue(f2.get().isSuccess()); - final DefaultTimekeeperImpl read = (DefaultTimekeeperImpl) raftServerMetrics.getTimer(RAFT_CLIENT_READ_REQUEST); + final DefaultTimekeeperImpl read = (DefaultTimekeeperImpl) registry.timer(RAFT_CLIENT_READ_REQUEST); Assert.assertTrue(read.getTimer().getCount() > 0); final CompletableFuture f3 = client.async().sendStaleRead(new SimpleMessage("testing"), 0, leader.getId()); Assert.assertTrue(f3.get().isSuccess()); - final DefaultTimekeeperImpl staleRead = (DefaultTimekeeperImpl) raftServerMetrics.getTimer(RAFT_CLIENT_STALE_READ_REQUEST); + final DefaultTimekeeperImpl staleRead = (DefaultTimekeeperImpl) registry.timer(RAFT_CLIENT_STALE_READ_REQUEST); Assert.assertTrue(staleRead.getTimer().getCount() > 0); final CompletableFuture f4 = client.async().watch(0, RaftProtos.ReplicationLevel.ALL); Assert.assertTrue(f4.get().isSuccess()); - final DefaultTimekeeperImpl watchAll = (DefaultTimekeeperImpl) raftServerMetrics.getTimer( + final DefaultTimekeeperImpl watchAll = (DefaultTimekeeperImpl) registry.timer( String.format(RAFT_CLIENT_WATCH_REQUEST, "-ALL")); Assert.assertTrue(watchAll.getTimer().getCount() > 0); final CompletableFuture f5 = client.async().watch(0, RaftProtos.ReplicationLevel.MAJORITY); Assert.assertTrue(f5.get().isSuccess()); - final DefaultTimekeeperImpl watch = (DefaultTimekeeperImpl) raftServerMetrics.getTimer( + final DefaultTimekeeperImpl watch = (DefaultTimekeeperImpl) registry.timer( String.format(RAFT_CLIENT_WATCH_REQUEST, "")); Assert.assertTrue(watch.getTimer().getCount() > 0); } From 6e8cbfbfb7435ade2c1a96881629cc79432eaa26 Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Thu, 15 Sep 2022 23:30:06 +0800 Subject: [PATCH 2/2] Address review comments. --- .../ratis/grpc/metrics/MessageMetrics.java | 31 ++++++++++--------- .../apache/ratis/metrics/RatisMetrics.java | 7 +++++ .../metrics/NettyServerStreamRpcMetrics.java | 4 +-- .../server/metrics/LeaderElectionMetrics.java | 4 +-- .../server/metrics/RaftLogMetricsBase.java | 4 +-- .../server/metrics/RaftServerMetricsImpl.java | 17 ++++++---- .../server/impl/LeaderElectionTests.java | 2 +- .../server/raftlog/TestRaftLogMetrics.java | 2 +- .../segmented/TestSegmentedRaftLog.java | 4 +-- 9 files changed, 44 insertions(+), 31 deletions(-) diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/metrics/MessageMetrics.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/metrics/MessageMetrics.java index 2ff53c7b85..b152c67098 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/metrics/MessageMetrics.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/metrics/MessageMetrics.java @@ -19,6 +19,7 @@ import org.apache.ratis.metrics.LongCounter; import org.apache.ratis.metrics.MetricRegistryInfo; +import org.apache.ratis.metrics.RatisMetricRegistry; import org.apache.ratis.metrics.RatisMetrics; import java.util.Map; @@ -46,19 +47,19 @@ String getSuffix() { private final Map> types; public MessageMetrics(String endpointId, String endpointType) { - super(create( - new MetricRegistryInfo(endpointId, - RATIS_APPLICATION_NAME_METRICS, - String.format(GRPC_MESSAGE_METRICS, endpointType), - GRPC_MESSAGE_METRICS_DESC) - )); - + super(createRegistry(endpointId, endpointType)); this.types = newCounterMaps(Type.class); } - private void inc(Type t, String rpcType) { + private static RatisMetricRegistry createRegistry(String endpointId, String endpointType) { + final String name = String.format(GRPC_MESSAGE_METRICS, endpointType); + return create(new MetricRegistryInfo(endpointId, + RATIS_APPLICATION_NAME_METRICS, name, GRPC_MESSAGE_METRICS_DESC)); + } + + private void inc(String metricNamePrefix, Type t) { types.get(t) - .computeIfAbsent(rpcType, key -> getRegistry().counter(key + t.getSuffix())) + .computeIfAbsent(metricNamePrefix, prefix -> getRegistry().counter(prefix + t.getSuffix())) .inc(); } @@ -66,22 +67,22 @@ private void inc(Type t, String rpcType) { * Increments the count of RPCs that are started. * Both client and server use this. */ - public void rpcStarted(String rpcType){ - inc(Type.STARTED, rpcType); + public void rpcStarted(String metricNamePrefix){ + inc(metricNamePrefix, Type.STARTED); } /** * Increments the count of RPCs that were started and got completed. * Both client and server use this. */ - public void rpcCompleted(String rpcType){ - inc(Type.COMPLETED, rpcType); + public void rpcCompleted(String metricNamePrefix){ + inc(metricNamePrefix, Type.COMPLETED); } /** * Increments the count of RPCs received on the server. */ - public void rpcReceived(String rpcType){ - inc(Type.RECEIVED, rpcType); + public void rpcReceived(String metricNamePrefix){ + inc(metricNamePrefix, Type.RECEIVED); } } diff --git a/ratis-metrics/src/main/java/org/apache/ratis/metrics/RatisMetrics.java b/ratis-metrics/src/main/java/org/apache/ratis/metrics/RatisMetrics.java index a6ae90d779..7481394ee8 100644 --- a/ratis-metrics/src/main/java/org/apache/ratis/metrics/RatisMetrics.java +++ b/ratis-metrics/src/main/java/org/apache/ratis/metrics/RatisMetrics.java @@ -49,6 +49,13 @@ protected static > Map> newCounter return Collections.unmodifiableMap(maps); } + protected static > Map newTimerMap( + Class clazz, Function constructor) { + final EnumMap map = new EnumMap<>(clazz); + Arrays.stream(clazz.getEnumConstants()).forEach(t -> map.put(t, constructor.apply(t))); + return Collections.unmodifiableMap(map); + } + protected static RatisMetricRegistry create(MetricRegistryInfo info) { Optional metricRegistry = MetricRegistries.global().get(info); return metricRegistry.orElseGet(() -> { diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/metrics/NettyServerStreamRpcMetrics.java b/ratis-netty/src/main/java/org/apache/ratis/netty/metrics/NettyServerStreamRpcMetrics.java index cdcc245ef4..d29dadab91 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/metrics/NettyServerStreamRpcMetrics.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/metrics/NettyServerStreamRpcMetrics.java @@ -112,12 +112,12 @@ String getString(RequestType type) { private final Map> ops; public NettyServerStreamRpcMetrics(String serverId) { - super(getMetricRegistryForGrpcServer(serverId)); + super(createRegistry(serverId)); this.ops = newCounterMaps(Op.class); } - private static RatisMetricRegistry getMetricRegistryForGrpcServer(String serverId) { + private static RatisMetricRegistry createRegistry(String serverId) { return create(new MetricRegistryInfo(serverId, METRICS_APP_NAME, METRICS_COMP_NAME, METRICS_DESC)); } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/metrics/LeaderElectionMetrics.java b/ratis-server/src/main/java/org/apache/ratis/server/metrics/LeaderElectionMetrics.java index 9216aaeb35..7447498d42 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/metrics/LeaderElectionMetrics.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/metrics/LeaderElectionMetrics.java @@ -54,14 +54,14 @@ public final class LeaderElectionMetrics extends RatisMetrics { private volatile Timestamp lastElectionTime; private LeaderElectionMetrics(RaftGroupMemberId serverId, LongSupplier getLastLeaderElapsedTimeMs) { - super(getMetricRegistryForLeaderElection(serverId)); + super(createRegistry(serverId)); getRegistry().gauge(LAST_LEADER_ELAPSED_TIME, () -> getLastLeaderElapsedTimeMs::getAsLong); getRegistry().gauge(LAST_LEADER_ELECTION_ELAPSED_TIME, () -> () -> Optional.ofNullable(lastElectionTime).map(Timestamp::elapsedTimeMs).orElse(-1L)); } - public static RatisMetricRegistry getMetricRegistryForLeaderElection(RaftGroupMemberId serverId) { + public static RatisMetricRegistry createRegistry(RaftGroupMemberId serverId) { return create(new MetricRegistryInfo(serverId.toString(), RATIS_APPLICATION_NAME_METRICS, RATIS_LEADER_ELECTION_METRICS, RATIS_LEADER_ELECTION_METRICS_DESC)); diff --git a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftLogMetricsBase.java b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftLogMetricsBase.java index 15b00ca9d7..17a200f004 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftLogMetricsBase.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftLogMetricsBase.java @@ -39,10 +39,10 @@ public class RaftLogMetricsBase extends RatisMetrics implements RaftLogMetrics { private final LongCounter stateMachineLogEntryCount = getRegistry().counter(STATE_MACHINE_LOG_ENTRY_COUNT); public RaftLogMetricsBase(RaftGroupMemberId serverId) { - super(getLogWorkerMetricRegistry(serverId)); + super(createRegistry(serverId)); } - public static RatisMetricRegistry getLogWorkerMetricRegistry(RaftGroupMemberId serverId) { + public static RatisMetricRegistry createRegistry(RaftGroupMemberId serverId) { return create(new MetricRegistryInfo(serverId.toString(), RATIS_APPLICATION_NAME_METRICS, RATIS_LOG_WORKER_METRICS, RATIS_LOG_WORKER_METRICS_DESC)); diff --git a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftServerMetricsImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftServerMetricsImpl.java index f219cb2115..f37cfd4642 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftServerMetricsImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftServerMetricsImpl.java @@ -30,6 +30,7 @@ import org.apache.ratis.metrics.MetricRegistryInfo; import org.apache.ratis.metrics.RatisMetricRegistry; +import org.apache.ratis.proto.RaftProtos.ReplicationLevel; import org.apache.ratis.proto.RaftProtos.CommitInfoProto; import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase; import org.apache.ratis.protocol.RaftClientRequest.Type; @@ -85,9 +86,14 @@ public final class RaftServerMetricsImpl extends RatisMetrics implements RaftSer private final LongCounter numFailedClientWatch = getRegistry().counter(RATIS_SERVER_FAILED_CLIENT_WATCH_COUNT); private final LongCounter numFailedClientStream = getRegistry().counter(RATIS_SERVER_FAILED_CLIENT_STREAM_COUNT); - private final LongCounter numInstallSnapshot = getRegistry().counter(RATIS_SERVER_INSTALL_SNAPSHOT_COUNT); + private final Timekeeper readTimer = getRegistry().timer(RAFT_CLIENT_READ_REQUEST); + private final Timekeeper staleReadTimer = getRegistry().timer(RAFT_CLIENT_STALE_READ_REQUEST); + private final Timekeeper writeTimer = getRegistry().timer(RAFT_CLIENT_WRITE_REQUEST); + private final Map watchTimers = newTimerMap(ReplicationLevel.class, + replication -> getRegistry().timer(String.format(RAFT_CLIENT_WATCH_REQUEST, Type.toString(replication)))); + private final Function followerAppendEntryLatency = newHeartbeatTimer(FOLLOWER_APPEND_ENTRIES_LATENCY); @@ -204,14 +210,13 @@ public Timekeeper getFollowerAppendEntryTimer(boolean isHeartbeat) { public Timekeeper getClientRequestTimer(Type request) { if (request.is(TypeCase.READ)) { - return getRegistry().timer(RAFT_CLIENT_READ_REQUEST); + return readTimer; } else if (request.is(TypeCase.STALEREAD)) { - return getRegistry().timer(RAFT_CLIENT_STALE_READ_REQUEST); + return staleReadTimer; } else if (request.is(TypeCase.WATCH)) { - String watchType = Type.toString(request.getWatch().getReplication()); - return getRegistry().timer(String.format(RAFT_CLIENT_WATCH_REQUEST, watchType)); + return watchTimers.get(request.getWatch().getReplication()); } else if (request.is(TypeCase.WRITE)) { - return getRegistry().timer(RAFT_CLIENT_WRITE_REQUEST); + return writeTimer; } return null; } diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java index a88f2b0189..68b23a4925 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java @@ -452,7 +452,7 @@ public void testLeaderElectionMetrics() throws IOException, InterruptedException final RaftServer.Division leaderServer = waitForLeader(cluster); final RatisMetricRegistryImpl ratisMetricRegistry = (RatisMetricRegistryImpl) - LeaderElectionMetrics.getMetricRegistryForLeaderElection(leaderServer.getMemberId()); + LeaderElectionMetrics.createRegistry(leaderServer.getMemberId()); // Verify each metric individually. long numLeaderElections = ratisMetricRegistry.counter(LEADER_ELECTION_COUNT_METRIC).getCount(); diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/TestRaftLogMetrics.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/TestRaftLogMetrics.java index a408216a08..70b185e554 100644 --- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/TestRaftLogMetrics.java +++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/TestRaftLogMetrics.java @@ -120,7 +120,7 @@ static void assertCommitCount(RaftServer.Division server, int expectedMsgs) { } static RatisMetricRegistryImpl getRegistry(RaftGroupMemberId memberId) { - return (RatisMetricRegistryImpl) RaftLogMetricsBase.getLogWorkerMetricRegistry(memberId); + return (RatisMetricRegistryImpl) RaftLogMetricsBase.createRegistry(memberId); } static void assertFlushCount(RaftServer.Division server) throws Exception { diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java index 2a600570f5..f8b9c960da 100644 --- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java +++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java @@ -232,7 +232,7 @@ public void testLoadLogSegments() throws Exception { Assert.assertArrayEquals(entries, entriesFromLog); Assert.assertEquals(entries[entries.length - 1], getLastEntry(raftLog)); - final RatisMetricRegistry metricRegistryForLogWorker = RaftLogMetricsBase.getLogWorkerMetricRegistry(memberId); + final RatisMetricRegistry metricRegistryForLogWorker = RaftLogMetricsBase.createRegistry(memberId); final DefaultTimekeeperImpl load = (DefaultTimekeeperImpl) metricRegistryForLogWorker.timer("segmentLoadLatency"); assertTrue(load.getTimer().getMeanRate() > 0); @@ -482,7 +482,7 @@ public void testPurgeLogMetric() throws Exception { int segmentSize = 200; long endIndexOfClosedSegment = segmentSize * (endTerm - startTerm - 1) - 1; long expectedIndex = segmentSize * (endTerm - startTerm - 1); - final RatisMetricRegistry metricRegistryForLogWorker = RaftLogMetricsBase.getLogWorkerMetricRegistry(memberId); + final RatisMetricRegistry metricRegistryForLogWorker = RaftLogMetricsBase.createRegistry(memberId); purgeAndVerify(startTerm, endTerm, segmentSize, 1, endIndexOfClosedSegment, expectedIndex); final DefaultTimekeeperImpl purge = (DefaultTimekeeperImpl) metricRegistryForLogWorker.timer("purgeLog"); assertTrue(purge.getTimer().getCount() > 0);