diff --git a/src/main/java/org/tikv/common/AbstractGRPCClient.java b/src/main/java/org/tikv/common/AbstractGRPCClient.java index dd886eb0700..a3c4ed5efff 100644 --- a/src/main/java/org/tikv/common/AbstractGRPCClient.java +++ b/src/main/java/org/tikv/common/AbstractGRPCClient.java @@ -90,7 +90,8 @@ public RespT callWithRetry( return ClientCalls.blockingUnaryCall( stub.getChannel(), method, stub.getCallOptions(), requestFactory.get()); }, - method.getFullMethodName()); + method.getFullMethodName(), + backOffer); if (logger.isTraceEnabled()) { logger.trace(String.format("leaving %s...", method.getFullMethodName())); @@ -118,7 +119,8 @@ protected void callAsyncWithRetry( responseObserver); return null; }, - method.getFullMethodName()); + method.getFullMethodName(), + backOffer); logger.debug(String.format("leaving %s...", method.getFullMethodName())); } @@ -139,7 +141,8 @@ StreamObserver callBidiStreamingWithRetry( return asyncBidiStreamingCall( stub.getChannel().newCall(method, stub.getCallOptions()), responseObserver); }, - method.getFullMethodName()); + method.getFullMethodName(), + backOffer); logger.debug(String.format("leaving %s...", method.getFullMethodName())); return observer; } @@ -162,7 +165,8 @@ public StreamingResponse callServerStreamingWithRetry( blockingServerStreamingCall( stub.getChannel(), method, stub.getCallOptions(), requestFactory.get())); }, - method.getFullMethodName()); + method.getFullMethodName(), + backOffer); logger.debug(String.format("leaving %s...", method.getFullMethodName())); return response; } diff --git a/src/main/java/org/tikv/common/ConfigUtils.java b/src/main/java/org/tikv/common/ConfigUtils.java index 7d0223c5529..f400242ef43 100644 --- a/src/main/java/org/tikv/common/ConfigUtils.java +++ b/src/main/java/org/tikv/common/ConfigUtils.java @@ -67,6 +67,13 @@ public class ConfigUtils { public static final String TIKV_RAWKV_SCAN_TIMEOUT_IN_MS = "tikv.rawkv.scan_timeout_in_ms"; public static final String TIKV_RAWKV_CLEAN_TIMEOUT_IN_MS = "tikv.rawkv.clean_timeout_in_ms"; public static final String TIKV_BO_REGION_MISS_BASE_IN_MS = "tikv.bo_region_miss_base_in_ms"; + public static final String TIKV_RAWKV_READ_SLOWLOG_IN_MS = "tikv.rawkv.read_slowlog_in_ms"; + public static final String TIKV_RAWKV_WRITE_SLOWLOG_IN_MS = "tikv.rawkv.write_slowlog_in_ms"; + public static final String TIKV_RAWKV_BATCH_READ_SLOWLOG_IN_MS = + "tikv.rawkv.batch_read_slowlog_in_ms"; + public static final String TIKV_RAWKV_BATCH_WRITE_SLOWLOG_IN_MS = + "tikv.rawkv.batch_write_slowlog_in_ms"; + public static final String TIKV_RAWKV_SCAN_SLOWLOG_IN_MS = "tikv.rawkv.scan_slowlog_in_ms"; public static final String DEF_PD_ADDRESSES = "127.0.0.1:2379"; public static final String DEF_TIMEOUT = "200ms"; @@ -110,6 +117,7 @@ public class ConfigUtils { public static final int DEF_TIKV_RAWKV_CLEAN_TIMEOUT_IN_MS = 600000; public static final int DEF_TIKV_BO_REGION_MISS_BASE_IN_MS = 20; + public static final String DEF_TIKV_RAWKV_SCAN_SLOWLOG_IN_MS = "5000"; public static final String NORMAL_COMMAND_PRIORITY = "NORMAL"; public static final String LOW_COMMAND_PRIORITY = "LOW"; diff --git a/src/main/java/org/tikv/common/TiConfiguration.java b/src/main/java/org/tikv/common/TiConfiguration.java index 614cfbdeaa3..54e96224017 100644 --- a/src/main/java/org/tikv/common/TiConfiguration.java +++ b/src/main/java/org/tikv/common/TiConfiguration.java @@ -119,6 +119,7 @@ private static void loadFromDefaultProperties() { setIfMissing(TIKV_RAWKV_SCAN_TIMEOUT_IN_MS, DEF_TIKV_RAWKV_SCAN_TIMEOUT_IN_MS); setIfMissing(TIKV_RAWKV_CLEAN_TIMEOUT_IN_MS, DEF_TIKV_RAWKV_CLEAN_TIMEOUT_IN_MS); setIfMissing(TIKV_BO_REGION_MISS_BASE_IN_MS, DEF_TIKV_BO_REGION_MISS_BASE_IN_MS); + setIfMissing(TIKV_RAWKV_SCAN_SLOWLOG_IN_MS, DEF_TIKV_RAWKV_SCAN_SLOWLOG_IN_MS); } public static void listAll() { @@ -169,6 +170,10 @@ public static int getInt(String key) { return Integer.parseInt(get(key)); } + public static Optional getIntOption(String key) { + return getOption(key).map(Integer::parseInt); + } + private static int getInt(String key, int defaultValue) { try { return getOption(key).map(Integer::parseInt).orElse(defaultValue); @@ -315,6 +320,13 @@ private static ReplicaRead getReplicaRead(String key) { private int rawKVBatchWriteTimeoutInMS = getInt(TIKV_RAWKV_BATCH_WRITE_TIMEOUT_IN_MS); private int rawKVScanTimeoutInMS = getInt(TIKV_RAWKV_SCAN_TIMEOUT_IN_MS); private int rawKVCleanTimeoutInMS = getInt(TIKV_RAWKV_CLEAN_TIMEOUT_IN_MS); + private Optional rawKVReadSlowLogInMS = getIntOption(TIKV_RAWKV_READ_SLOWLOG_IN_MS); + private Optional rawKVWriteSlowLogInMS = getIntOption(TIKV_RAWKV_WRITE_SLOWLOG_IN_MS); + private Optional rawKVBatchReadSlowLogInMS = + getIntOption(TIKV_RAWKV_BATCH_READ_SLOWLOG_IN_MS); + private Optional rawKVBatchWriteSlowLogInMS = + getIntOption(TIKV_RAWKV_BATCH_WRITE_SLOWLOG_IN_MS); + private int rawKVScanSlowLogInMS = getInt(TIKV_RAWKV_SCAN_SLOWLOG_IN_MS); public enum KVMode { TXN, @@ -677,4 +689,44 @@ public int getRawKVCleanTimeoutInMS() { public void setRawKVCleanTimeoutInMS(int rawKVCleanTimeoutInMS) { this.rawKVCleanTimeoutInMS = rawKVCleanTimeoutInMS; } + + public Integer getRawKVReadSlowLogInMS() { + return rawKVReadSlowLogInMS.orElse((int) (getTimeout() * 2)); + } + + public void setRawKVReadSlowLogInMS(Integer rawKVReadSlowLogInMS) { + this.rawKVReadSlowLogInMS = Optional.of(rawKVReadSlowLogInMS); + } + + public Integer getRawKVWriteSlowLogInMS() { + return rawKVWriteSlowLogInMS.orElse((int) (getTimeout() * 2)); + } + + public void setRawKVWriteSlowLogInMS(Integer rawKVWriteSlowLogInMS) { + this.rawKVWriteSlowLogInMS = Optional.of(rawKVWriteSlowLogInMS); + } + + public Integer getRawKVBatchReadSlowLogInMS() { + return rawKVBatchReadSlowLogInMS.orElse((int) (getTimeout() * 2)); + } + + public void setRawKVBatchReadSlowLogInMS(Integer rawKVBatchReadSlowLogInMS) { + this.rawKVBatchReadSlowLogInMS = Optional.of(rawKVBatchReadSlowLogInMS); + } + + public Integer getRawKVBatchWriteSlowLogInMS() { + return rawKVBatchWriteSlowLogInMS.orElse((int) (getTimeout() * 2)); + } + + public void setRawKVBatchWriteSlowLogInMS(Integer rawKVBatchWriteSlowLogInMS) { + this.rawKVBatchWriteSlowLogInMS = Optional.of(rawKVBatchWriteSlowLogInMS); + } + + public int getRawKVScanSlowLogInMS() { + return rawKVScanSlowLogInMS; + } + + public void setRawKVScanSlowLogInMS(int rawKVScanSlowLogInMS) { + this.rawKVScanSlowLogInMS = rawKVScanSlowLogInMS; + } } diff --git a/src/main/java/org/tikv/common/log/SlowLog.java b/src/main/java/org/tikv/common/log/SlowLog.java new file mode 100644 index 00000000000..e172a003462 --- /dev/null +++ b/src/main/java/org/tikv/common/log/SlowLog.java @@ -0,0 +1,26 @@ +/* + * + * Copyright 2021 PingCAP, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.tikv.common.log; + +public interface SlowLog { + void addProperty(String key, String value); + + SlowLogSpan start(String name); + + void log(); +} diff --git a/src/main/java/org/tikv/common/log/SlowLogEmptyImpl.java b/src/main/java/org/tikv/common/log/SlowLogEmptyImpl.java new file mode 100644 index 00000000000..37e68237db5 --- /dev/null +++ b/src/main/java/org/tikv/common/log/SlowLogEmptyImpl.java @@ -0,0 +1,35 @@ +/* + * + * Copyright 2021 PingCAP, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.tikv.common.log; + +public class SlowLogEmptyImpl implements SlowLog { + public static final SlowLogEmptyImpl INSTANCE = new SlowLogEmptyImpl(); + + private SlowLogEmptyImpl() {} + + @Override + public void addProperty(String key, String value) {} + + @Override + public SlowLogSpan start(String name) { + return SlowLogSpanEmptyImpl.INSTANCE; + } + + @Override + public void log() {} +} diff --git a/src/main/java/org/tikv/common/log/SlowLogImpl.java b/src/main/java/org/tikv/common/log/SlowLogImpl.java new file mode 100644 index 00000000000..76ed4c922bf --- /dev/null +++ b/src/main/java/org/tikv/common/log/SlowLogImpl.java @@ -0,0 +1,93 @@ +/* + * + * Copyright 2021 PingCAP, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.tikv.common.log; + +import com.google.gson.JsonArray; +import com.google.gson.JsonObject; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SlowLogImpl implements SlowLog { + private static final Logger logger = LoggerFactory.getLogger(SlowLogImpl.class); + + private static final int MAX_SPAN_SIZE = 1024; + + public static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("HH:mm:ss.SSS"); + + private final List slowLogSpans = new ArrayList<>(); + + private final long startMS; + private final long slowThresholdMS; + + /** Key-Value pairs which will be logged, e.g. function name, key, region, etc. */ + private final Map properties; + + public SlowLogImpl(long slowThresholdMS, Map properties) { + this.startMS = System.currentTimeMillis(); + this.slowThresholdMS = slowThresholdMS; + this.properties = new HashMap<>(properties); + } + + @Override + public void addProperty(String key, String value) { + this.properties.put(key, value); + } + + @Override + public synchronized SlowLogSpan start(String name) { + SlowLogSpan slowLogSpan = new SlowLogSpanImpl(name); + if (slowLogSpans.size() < MAX_SPAN_SIZE) { + slowLogSpans.add(slowLogSpan); + } + slowLogSpan.start(); + return slowLogSpan; + } + + @Override + public void log() { + long currentMS = System.currentTimeMillis(); + if (slowThresholdMS >= 0 && currentMS - startMS > slowThresholdMS) { + logger.warn("SlowLog:" + getSlowLogString(currentMS)); + } + } + + private String getSlowLogString(long currentMS) { + JsonObject jsonObject = new JsonObject(); + + jsonObject.addProperty("start", DATE_FORMAT.format(startMS)); + jsonObject.addProperty("end", DATE_FORMAT.format(currentMS)); + jsonObject.addProperty("duration", (currentMS - startMS) + "ms"); + + for (Map.Entry entry : properties.entrySet()) { + jsonObject.addProperty(entry.getKey(), entry.getValue()); + } + + JsonArray jsonArray = new JsonArray(); + for (SlowLogSpan slowLogSpan : slowLogSpans) { + jsonArray.add(slowLogSpan.toJsonElement()); + } + jsonObject.add("spans", jsonArray); + + return jsonObject.toString(); + } +} diff --git a/src/main/java/org/tikv/common/log/SlowLogSpan.java b/src/main/java/org/tikv/common/log/SlowLogSpan.java new file mode 100644 index 00000000000..a56e9df1885 --- /dev/null +++ b/src/main/java/org/tikv/common/log/SlowLogSpan.java @@ -0,0 +1,28 @@ +/* + * + * Copyright 2021 PingCAP, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.tikv.common.log; + +import com.google.gson.JsonElement; + +public interface SlowLogSpan { + void start(); + + void end(); + + JsonElement toJsonElement(); +} diff --git a/src/main/java/org/tikv/common/log/SlowLogSpanEmptyImpl.java b/src/main/java/org/tikv/common/log/SlowLogSpanEmptyImpl.java new file mode 100644 index 00000000000..11231d1c685 --- /dev/null +++ b/src/main/java/org/tikv/common/log/SlowLogSpanEmptyImpl.java @@ -0,0 +1,39 @@ +/* + * + * Copyright 2021 PingCAP, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.tikv.common.log; + +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; + +public class SlowLogSpanEmptyImpl implements SlowLogSpan { + + public static final SlowLogSpanEmptyImpl INSTANCE = new SlowLogSpanEmptyImpl(); + + private SlowLogSpanEmptyImpl() {} + + @Override + public void start() {} + + @Override + public void end() {} + + @Override + public JsonElement toJsonElement() { + return new JsonObject(); + } +} diff --git a/src/main/java/org/tikv/common/log/SlowLogSpanImpl.java b/src/main/java/org/tikv/common/log/SlowLogSpanImpl.java new file mode 100644 index 00000000000..960baeb41dc --- /dev/null +++ b/src/main/java/org/tikv/common/log/SlowLogSpanImpl.java @@ -0,0 +1,77 @@ +/* + * + * Copyright 2021 PingCAP, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.tikv.common.log; + +import static org.tikv.common.log.SlowLogImpl.DATE_FORMAT; + +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; + +public class SlowLogSpanImpl implements SlowLogSpan { + private final String name; + private long startMS; + private long endMS; + + public SlowLogSpanImpl(String name) { + this.name = name; + this.startMS = 0; + this.endMS = 0; + } + + @Override + public void start() { + this.startMS = System.currentTimeMillis(); + } + + @Override + public void end() { + this.endMS = System.currentTimeMillis(); + } + + @Override + public JsonElement toJsonElement() { + JsonObject jsonObject = new JsonObject(); + jsonObject.addProperty("name", name); + jsonObject.addProperty("start", getStartString()); + jsonObject.addProperty("end", getEndString()); + jsonObject.addProperty("duration", getDurationString()); + + return jsonObject; + } + + private String getStartString() { + if (startMS == 0) { + return "N/A"; + } + return DATE_FORMAT.format(startMS); + } + + private String getEndString() { + if (endMS == 0) { + return "N/A"; + } + return DATE_FORMAT.format(endMS); + } + + private String getDurationString() { + if (startMS == 0 || endMS == 0) { + return "N/A"; + } + return (endMS - startMS) + "ms"; + } +} diff --git a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java index 230ceedaa34..f88772d255b 100644 --- a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java +++ b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java @@ -217,7 +217,7 @@ private boolean onRegionEpochNotMatch(BackOffer backOffer, List c @Override public boolean handleRequestError(BackOffer backOffer, Exception e) { - if (recv.onStoreUnreachable()) { + if (recv.onStoreUnreachable(backOffer.getSlowLog())) { if (!backOffer.canRetryAfterSleep(BackOffFunction.BackOffFuncType.BoTiKVRPC)) { regionManager.onRequestFail(recv.getRegion()); throw new GrpcException("retry is exhausted.", e); diff --git a/src/main/java/org/tikv/common/policy/RetryPolicy.java b/src/main/java/org/tikv/common/policy/RetryPolicy.java index 73cb9dc0e93..f27c1eaaac1 100644 --- a/src/main/java/org/tikv/common/policy/RetryPolicy.java +++ b/src/main/java/org/tikv/common/policy/RetryPolicy.java @@ -21,6 +21,7 @@ import io.prometheus.client.Histogram; import java.util.concurrent.Callable; import org.tikv.common.exception.GrpcException; +import org.tikv.common.log.SlowLogSpan; import org.tikv.common.operation.ErrorHandler; import org.tikv.common.util.BackOffer; import org.tikv.common.util.ConcreteBackOffer; @@ -67,8 +68,10 @@ private void rethrowNotRecoverableException(Exception e) { } } - public RespT callWithRetry(Callable proc, String methodName) { + public RespT callWithRetry(Callable proc, String methodName, BackOffer backOffer) { Histogram.Timer callWithRetryTimer = CALL_WITH_RETRY_DURATION.labels(methodName).startTimer(); + SlowLogSpan callWithRetrySlowLogSpan = + backOffer.getSlowLog().start("callWithRetry " + methodName); try { while (true) { RespT result = null; @@ -76,9 +79,11 @@ public RespT callWithRetry(Callable proc, String methodName) { // add single request duration histogram Histogram.Timer requestTimer = GRPC_SINGLE_REQUEST_LATENCY.labels(methodName).startTimer(); + SlowLogSpan slowLogSpan = backOffer.getSlowLog().start("gRPC " + methodName); try { result = proc.call(); } finally { + slowLogSpan.end(); requestTimer.observeDuration(); } } catch (Exception e) { @@ -105,6 +110,7 @@ public RespT callWithRetry(Callable proc, String methodName) { } } finally { callWithRetryTimer.observeDuration(); + callWithRetrySlowLogSpan.end(); } } diff --git a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java index 96e620279aa..8b102f04fc5 100644 --- a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java @@ -35,6 +35,9 @@ import org.tikv.common.AbstractGRPCClient; import org.tikv.common.TiConfiguration; import org.tikv.common.exception.GrpcException; +import org.tikv.common.log.SlowLog; +import org.tikv.common.log.SlowLogEmptyImpl; +import org.tikv.common.log.SlowLogSpan; import org.tikv.common.util.ChannelFactory; import org.tikv.common.util.Pair; import org.tikv.kvproto.Kvrpcpb; @@ -80,7 +83,9 @@ protected AbstractRegionStoreClient( if (this.store.getProxyStore() != null) { this.timeout = conf.getForwardTimeout(); } else if (!this.store.isReachable()) { - onStoreUnreachable(); + // cannot get Deadline or SlowLog instance here + // use SlowLogEmptyImpl instead to skip slow log record + onStoreUnreachable(SlowLogEmptyImpl.INSTANCE); } } @@ -130,7 +135,7 @@ public boolean onNotLeader(TiRegion newRegion) { } @Override - public boolean onStoreUnreachable() { + public boolean onStoreUnreachable(SlowLog slowLog) { if (!store.isValid()) { logger.warn(String.format("store [%d] has been invalid", store.getId())); store = regionManager.getStoreById(store.getId()); @@ -148,13 +153,13 @@ public boolean onStoreUnreachable() { } // seek an available leader store to send request - Boolean result = seekLeaderStore(); + Boolean result = seekLeaderStore(slowLog); if (result != null) { return result; } if (conf.getEnableGrpcForward()) { // seek an available proxy store to forward request - return seekProxyStore(); + return seekProxyStore(slowLog); } return false; } @@ -187,8 +192,9 @@ private void updateClientStub() { } } - private Boolean seekLeaderStore() { + private Boolean seekLeaderStore(SlowLog slowLog) { Histogram.Timer switchLeaderDurationTimer = SEEK_LEADER_STORE_DURATION.startTimer(); + SlowLogSpan slowLogSpan = slowLog.start("seekLeaderStore"); try { List peers = region.getFollowerList(); if (peers.isEmpty()) { @@ -237,11 +243,13 @@ private Boolean seekLeaderStore() { } } finally { switchLeaderDurationTimer.observeDuration(); + slowLogSpan.end(); } return null; } - private boolean seekProxyStore() { + private boolean seekProxyStore(SlowLog slowLog) { + SlowLogSpan slowLogSpan = slowLog.start("seekProxyStore"); Histogram.Timer grpcForwardDurationTimer = SEEK_PROXY_STORE_DURATION.startTimer(); try { logger.info(String.format("try grpc forward: region[%d]", region.getId())); @@ -259,6 +267,7 @@ private boolean seekProxyStore() { return true; } finally { grpcForwardDurationTimer.observeDuration(); + slowLogSpan.end(); } } diff --git a/src/main/java/org/tikv/common/region/RegionErrorReceiver.java b/src/main/java/org/tikv/common/region/RegionErrorReceiver.java index 4bee1356eab..307538d4a82 100644 --- a/src/main/java/org/tikv/common/region/RegionErrorReceiver.java +++ b/src/main/java/org/tikv/common/region/RegionErrorReceiver.java @@ -17,11 +17,13 @@ package org.tikv.common.region; +import org.tikv.common.log.SlowLog; + public interface RegionErrorReceiver { boolean onNotLeader(TiRegion region); /// return whether we need to retry this request. - boolean onStoreUnreachable(); + boolean onStoreUnreachable(SlowLog slowLog); TiRegion getRegion(); } diff --git a/src/main/java/org/tikv/common/util/BackOffer.java b/src/main/java/org/tikv/common/util/BackOffer.java index b5081a39b5c..91b3d7c0dfe 100644 --- a/src/main/java/org/tikv/common/util/BackOffer.java +++ b/src/main/java/org/tikv/common/util/BackOffer.java @@ -17,6 +17,8 @@ package org.tikv.common.util; +import org.tikv.common.log.SlowLog; + public interface BackOffer { // Back off types. int seconds = 1000; @@ -58,4 +60,6 @@ enum BackOffStrategy { // DecorrJitter increases the maximum jitter based on the last random value. DecorrJitter } + + SlowLog getSlowLog(); } diff --git a/src/main/java/org/tikv/common/util/ConcreteBackOffer.java b/src/main/java/org/tikv/common/util/ConcreteBackOffer.java index 141bb5ed76e..89c79c30783 100644 --- a/src/main/java/org/tikv/common/util/ConcreteBackOffer.java +++ b/src/main/java/org/tikv/common/util/ConcreteBackOffer.java @@ -29,6 +29,9 @@ import org.slf4j.LoggerFactory; import org.tikv.common.TiConfiguration; import org.tikv.common.exception.GrpcException; +import org.tikv.common.log.SlowLog; +import org.tikv.common.log.SlowLogEmptyImpl; +import org.tikv.common.log.SlowLogSpan; public class ConcreteBackOffer implements BackOffer { private static final Logger logger = LoggerFactory.getLogger(ConcreteBackOffer.class); @@ -37,6 +40,7 @@ public class ConcreteBackOffer implements BackOffer { private final List errors; private int totalSleep; private final long deadline; + private final SlowLog slowLog; public static final Histogram BACKOFF_DURATION = Histogram.build() @@ -45,7 +49,7 @@ public class ConcreteBackOffer implements BackOffer { .labelNames("type") .register(); - private ConcreteBackOffer(int maxSleep, long deadline) { + private ConcreteBackOffer(int maxSleep, long deadline, SlowLog slowLog) { Preconditions.checkArgument( maxSleep == 0 || deadline == 0, "Max sleep time should be 0 or Deadline should be 0."); Preconditions.checkArgument(maxSleep >= 0, "Max sleep time cannot be less than 0."); @@ -54,6 +58,7 @@ private ConcreteBackOffer(int maxSleep, long deadline) { this.errors = new ArrayList<>(); this.backOffFunctionMap = new HashMap<>(); this.deadline = deadline; + this.slowLog = slowLog; } private ConcreteBackOffer(ConcreteBackOffer source) { @@ -62,39 +67,40 @@ private ConcreteBackOffer(ConcreteBackOffer source) { this.errors = source.errors; this.backOffFunctionMap = source.backOffFunctionMap; this.deadline = source.deadline; + this.slowLog = source.slowLog; } - public static ConcreteBackOffer newDeadlineBackOff(int timeoutInMs) { + public static ConcreteBackOffer newDeadlineBackOff(int timeoutInMs, SlowLog slowLog) { long deadline = System.currentTimeMillis() + timeoutInMs; - return new ConcreteBackOffer(0, deadline); + return new ConcreteBackOffer(0, deadline, slowLog); } public static ConcreteBackOffer newCustomBackOff(int maxSleep) { - return new ConcreteBackOffer(maxSleep, 0); + return new ConcreteBackOffer(maxSleep, 0, SlowLogEmptyImpl.INSTANCE); } public static ConcreteBackOffer newScannerNextMaxBackOff() { - return new ConcreteBackOffer(SCANNER_NEXT_MAX_BACKOFF, 0); + return new ConcreteBackOffer(SCANNER_NEXT_MAX_BACKOFF, 0, SlowLogEmptyImpl.INSTANCE); } public static ConcreteBackOffer newBatchGetMaxBackOff() { - return new ConcreteBackOffer(BATCH_GET_MAX_BACKOFF, 0); + return new ConcreteBackOffer(BATCH_GET_MAX_BACKOFF, 0, SlowLogEmptyImpl.INSTANCE); } public static ConcreteBackOffer newCopNextMaxBackOff() { - return new ConcreteBackOffer(COP_NEXT_MAX_BACKOFF, 0); + return new ConcreteBackOffer(COP_NEXT_MAX_BACKOFF, 0, SlowLogEmptyImpl.INSTANCE); } public static ConcreteBackOffer newGetBackOff() { - return new ConcreteBackOffer(GET_MAX_BACKOFF, 0); + return new ConcreteBackOffer(GET_MAX_BACKOFF, 0, SlowLogEmptyImpl.INSTANCE); } public static ConcreteBackOffer newRawKVBackOff() { - return new ConcreteBackOffer(RAWKV_MAX_BACKOFF, 0); + return new ConcreteBackOffer(RAWKV_MAX_BACKOFF, 0, SlowLogEmptyImpl.INSTANCE); } public static ConcreteBackOffer newTsoBackOff() { - return new ConcreteBackOffer(TSO_MAX_BACKOFF, 0); + return new ConcreteBackOffer(TSO_MAX_BACKOFF, 0, SlowLogEmptyImpl.INSTANCE); } public static ConcreteBackOffer create(BackOffer source) { @@ -151,6 +157,7 @@ public boolean canRetryAfterSleep(BackOffFunction.BackOffFuncType funcType) { } public boolean canRetryAfterSleep(BackOffFunction.BackOffFuncType funcType, long maxSleepMs) { + SlowLogSpan slowLogSpan = getSlowLog().start("backoff " + funcType.name()); Histogram.Timer backOffTimer = BACKOFF_DURATION.labels(funcType.name()).startTimer(); BackOffFunction backOffFunction = backOffFunctionMap.computeIfAbsent(funcType, this::createBackOffFunc); @@ -171,8 +178,10 @@ public boolean canRetryAfterSleep(BackOffFunction.BackOffFuncType funcType, long Thread.sleep(sleep); } catch (InterruptedException e) { throw new GrpcException(e); + } finally { + slowLogSpan.end(); + backOffTimer.observeDuration(); } - backOffTimer.observeDuration(); if (maxSleep > 0 && totalSleep >= maxSleep) { logger.warn(String.format("BackOffer.maxSleep %dms is exceeded, errors:", maxSleep)); return false; @@ -206,4 +215,9 @@ private void logThrowError(Exception err) { // Use the last backoff type to generate an exception throw new GrpcException("retry is exhausted.", err); } + + @Override + public SlowLog getSlowLog() { + return slowLog; + } } diff --git a/src/main/java/org/tikv/raw/RawKVClient.java b/src/main/java/org/tikv/raw/RawKVClient.java index fdd96600b96..e488ab71886 100644 --- a/src/main/java/org/tikv/raw/RawKVClient.java +++ b/src/main/java/org/tikv/raw/RawKVClient.java @@ -27,8 +27,12 @@ import org.slf4j.LoggerFactory; import org.tikv.common.TiConfiguration; import org.tikv.common.TiSession; +import org.tikv.common.codec.KeyUtils; import org.tikv.common.exception.TiKVException; import org.tikv.common.key.Key; +import org.tikv.common.log.SlowLog; +import org.tikv.common.log.SlowLogEmptyImpl; +import org.tikv.common.log.SlowLogImpl; import org.tikv.common.operation.iterator.RawScanIterator; import org.tikv.common.region.RegionStoreClient; import org.tikv.common.region.RegionStoreClient.RegionStoreClientBuilder; @@ -129,10 +133,21 @@ public void putAtomic(ByteString key, ByteString value, long ttl) { private void put(ByteString key, ByteString value, long ttl, boolean atomic) { String label = "client_raw_put"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); + SlowLog slowLog = + new SlowLogImpl( + conf.getRawKVWriteSlowLogInMS(), + new HashMap(2) { + { + put("func", "put"); + put("key", KeyUtils.formatBytesUTF8(key)); + } + }); + ConcreteBackOffer backOffer = + ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVWriteTimeoutInMS(), slowLog); try { - BackOffer backOffer = ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVWriteTimeoutInMS()); while (true) { RegionStoreClient client = clientBuilder.build(key, backOffer); + slowLog.addProperty("region", client.getRegion().toString()); try { client.rawPut(backOffer, key, value, ttl, atomic); RAW_REQUEST_SUCCESS.labels(label).inc(); @@ -147,6 +162,7 @@ private void put(ByteString key, ByteString value, long ttl, boolean atomic) { throw e; } finally { requestTimer.observeDuration(); + slowLog.log(); } } @@ -174,10 +190,21 @@ public ByteString putIfAbsent(ByteString key, ByteString value) { public ByteString putIfAbsent(ByteString key, ByteString value, long ttl) { String label = "client_raw_put_if_absent"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); + SlowLog slowLog = + new SlowLogImpl( + conf.getRawKVWriteSlowLogInMS(), + new HashMap(2) { + { + put("func", "putIfAbsent"); + put("key", KeyUtils.formatBytesUTF8(key)); + } + }); + ConcreteBackOffer backOffer = + ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVWriteTimeoutInMS(), slowLog); try { - BackOffer backOffer = ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVWriteTimeoutInMS()); while (true) { RegionStoreClient client = clientBuilder.build(key, backOffer); + slowLog.addProperty("region", client.getRegion().toString()); try { ByteString result = client.rawPutIfAbsent(backOffer, key, value, ttl); RAW_REQUEST_SUCCESS.labels(label).inc(); @@ -192,6 +219,7 @@ public ByteString putIfAbsent(ByteString key, ByteString value, long ttl) { throw e; } finally { requestTimer.observeDuration(); + slowLog.log(); } } @@ -236,9 +264,18 @@ public void batchPutAtomic(Map kvPairs, long ttl) { private void batchPut(Map kvPairs, long ttl, boolean atomic) { String label = "client_raw_batch_put"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); + SlowLog slowLog = + new SlowLogImpl( + conf.getRawKVBatchWriteSlowLogInMS(), + new HashMap(2) { + { + put("func", "batchPut"); + put("keySize", String.valueOf(kvPairs.size())); + } + }); + ConcreteBackOffer backOffer = + ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVBatchWriteTimeoutInMS(), slowLog); try { - BackOffer backOffer = - ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVBatchWriteTimeoutInMS()); long deadline = System.currentTimeMillis() + conf.getRawKVBatchWriteTimeoutInMS(); doSendBatchPut(backOffer, kvPairs, ttl, atomic, deadline); RAW_REQUEST_SUCCESS.labels(label).inc(); @@ -247,6 +284,7 @@ private void batchPut(Map kvPairs, long ttl, boolean ato throw e; } finally { requestTimer.observeDuration(); + slowLog.log(); } } @@ -259,10 +297,22 @@ private void batchPut(Map kvPairs, long ttl, boolean ato public ByteString get(ByteString key) { String label = "client_raw_get"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); + SlowLog slowLog = + new SlowLogImpl( + conf.getRawKVReadSlowLogInMS(), + new HashMap(2) { + { + put("func", "get"); + put("key", KeyUtils.formatBytesUTF8(key)); + } + }); + + ConcreteBackOffer backOffer = + ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVReadTimeoutInMS(), slowLog); try { - BackOffer backOffer = ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVReadTimeoutInMS()); while (true) { RegionStoreClient client = clientBuilder.build(key, backOffer); + slowLog.addProperty("region", client.getRegion().toString()); try { ByteString result = client.rawGet(backOffer, key); RAW_REQUEST_SUCCESS.labels(label).inc(); @@ -277,6 +327,7 @@ public ByteString get(ByteString key) { throw e; } finally { requestTimer.observeDuration(); + slowLog.log(); } } @@ -289,9 +340,18 @@ public ByteString get(ByteString key) { public List batchGet(List keys) { String label = "client_raw_batch_get"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); + SlowLog slowLog = + new SlowLogImpl( + conf.getRawKVBatchReadSlowLogInMS(), + new HashMap(2) { + { + put("func", "batchGet"); + put("keySize", String.valueOf(keys.size())); + } + }); + ConcreteBackOffer backOffer = + ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVBatchReadTimeoutInMS(), slowLog); try { - BackOffer backOffer = - ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVBatchReadTimeoutInMS()); long deadline = System.currentTimeMillis() + conf.getRawKVBatchReadTimeoutInMS(); List result = doSendBatchGet(backOffer, keys, deadline); RAW_REQUEST_SUCCESS.labels(label).inc(); @@ -301,6 +361,7 @@ public List batchGet(List keys) { throw e; } finally { requestTimer.observeDuration(); + slowLog.log(); } } @@ -325,9 +386,18 @@ public void batchDeleteAtomic(List keys) { private void batchDelete(List keys, boolean atomic) { String label = "client_raw_batch_delete"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); + SlowLog slowLog = + new SlowLogImpl( + conf.getRawKVBatchWriteSlowLogInMS(), + new HashMap(2) { + { + put("func", "batchDelete"); + put("keySize", String.valueOf(keys.size())); + } + }); + ConcreteBackOffer backOffer = + ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVBatchWriteTimeoutInMS(), slowLog); try { - BackOffer backOffer = - ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVBatchWriteTimeoutInMS()); long deadline = System.currentTimeMillis() + conf.getRawKVBatchWriteTimeoutInMS(); doSendBatchDelete(backOffer, keys, atomic, deadline); RAW_REQUEST_SUCCESS.labels(label).inc(); @@ -337,6 +407,7 @@ private void batchDelete(List keys, boolean atomic) { throw e; } finally { requestTimer.observeDuration(); + slowLog.log(); } } @@ -350,10 +421,21 @@ private void batchDelete(List keys, boolean atomic) { public Long getKeyTTL(ByteString key) { String label = "client_raw_get_key_ttl"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); + SlowLog slowLog = + new SlowLogImpl( + conf.getRawKVReadSlowLogInMS(), + new HashMap(2) { + { + put("func", "getKeyTTL"); + put("key", KeyUtils.formatBytesUTF8(key)); + } + }); + ConcreteBackOffer backOffer = + ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVReadTimeoutInMS(), slowLog); try { - BackOffer backOffer = ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVReadTimeoutInMS()); while (true) { RegionStoreClient client = clientBuilder.build(key, backOffer); + slowLog.addProperty("region", client.getRegion().toString()); try { Long result = client.rawGetKeyTTL(backOffer, key); RAW_REQUEST_SUCCESS.labels(label).inc(); @@ -368,6 +450,7 @@ public Long getKeyTTL(ByteString key) { throw e; } finally { requestTimer.observeDuration(); + slowLog.log(); } } @@ -445,8 +528,21 @@ public List scan(ByteString startKey, ByteString endKey, int limit) { public List scan(ByteString startKey, ByteString endKey, int limit, boolean keyOnly) { String label = "client_raw_scan"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); + SlowLog slowLog = + new SlowLogImpl( + conf.getRawKVScanSlowLogInMS(), + new HashMap(5) { + { + put("func", "scan"); + put("startKey", KeyUtils.formatBytesUTF8(startKey)); + put("endKey", KeyUtils.formatBytesUTF8(endKey)); + put("limit", String.valueOf(limit)); + put("keyOnly", String.valueOf(keyOnly)); + } + }); + ConcreteBackOffer backOffer = + ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVScanTimeoutInMS(), slowLog); try { - BackOffer backOffer = ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVScanTimeoutInMS()); Iterator iterator = rawScanIterator(conf, clientBuilder, startKey, endKey, limit, keyOnly, backOffer); List result = new ArrayList<>(); @@ -458,6 +554,7 @@ public List scan(ByteString startKey, ByteString endKey, int limit, bool throw e; } finally { requestTimer.observeDuration(); + slowLog.log(); } } @@ -506,18 +603,37 @@ public List scan(ByteString startKey, ByteString endKey) { public List scan(ByteString startKey, ByteString endKey, boolean keyOnly) { String label = "client_raw_scan_without_limit"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); + SlowLog slowLog = + new SlowLogImpl( + conf.getRawKVScanSlowLogInMS(), + new HashMap(4) { + { + put("func", "scan"); + put("startKey", KeyUtils.formatBytesUTF8(startKey)); + put("endKey", KeyUtils.formatBytesUTF8(endKey)); + put("keyOnly", String.valueOf(keyOnly)); + } + }); + ConcreteBackOffer backOffer = + ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVScanTimeoutInMS(), slowLog); try { - BackOffer backOffer = ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVScanTimeoutInMS()); + ByteString newStartKey = startKey; List result = new ArrayList<>(); while (true) { Iterator iterator = rawScanIterator( - conf, clientBuilder, startKey, endKey, conf.getScanBatchSize(), keyOnly, backOffer); + conf, + clientBuilder, + newStartKey, + endKey, + conf.getScanBatchSize(), + keyOnly, + backOffer); if (!iterator.hasNext()) { break; } iterator.forEachRemaining(result::add); - startKey = Key.toRawKey(result.get(result.size() - 1).getKey()).next().toByteString(); + newStartKey = Key.toRawKey(result.get(result.size() - 1).getKey()).next().toByteString(); } RAW_REQUEST_SUCCESS.labels(label).inc(); return result; @@ -526,6 +642,7 @@ public List scan(ByteString startKey, ByteString endKey, boolean keyOnly throw e; } finally { requestTimer.observeDuration(); + slowLog.log(); } } @@ -578,10 +695,22 @@ public void deleteAtomic(ByteString key) { private void delete(ByteString key, boolean atomic) { String label = "client_raw_delete"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); + SlowLog slowLog = + new SlowLogImpl( + conf.getRawKVWriteSlowLogInMS(), + new HashMap(3) { + { + put("func", "delete"); + put("key", KeyUtils.formatBytesUTF8(key)); + put("atomic", String.valueOf(atomic)); + } + }); + ConcreteBackOffer backOffer = + ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVWriteTimeoutInMS(), slowLog); try { - BackOffer backOffer = ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVWriteTimeoutInMS()); while (true) { RegionStoreClient client = clientBuilder.build(key, backOffer); + slowLog.addProperty("region", client.getRegion().toString()); try { client.rawDelete(backOffer, key, atomic); RAW_REQUEST_SUCCESS.labels(label).inc(); @@ -596,6 +725,7 @@ private void delete(ByteString key, boolean atomic) { throw e; } finally { requestTimer.observeDuration(); + slowLog.log(); } } @@ -611,8 +741,10 @@ private void delete(ByteString key, boolean atomic) { public synchronized void deleteRange(ByteString startKey, ByteString endKey) { String label = "client_raw_delete_range"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); + ConcreteBackOffer backOffer = + ConcreteBackOffer.newDeadlineBackOff( + conf.getRawKVCleanTimeoutInMS(), SlowLogEmptyImpl.INSTANCE); try { - BackOffer backOffer = ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVCleanTimeoutInMS()); long deadline = System.currentTimeMillis() + conf.getRawKVCleanTimeoutInMS(); doSendDeleteRange(backOffer, startKey, endKey, deadline); RAW_REQUEST_SUCCESS.labels(label).inc(); diff --git a/src/test/java/org/tikv/raw/RawKVClientTest.java b/src/test/java/org/tikv/raw/RawKVClientTest.java index 26fa171e471..5fe15dbd228 100644 --- a/src/test/java/org/tikv/raw/RawKVClientTest.java +++ b/src/test/java/org/tikv/raw/RawKVClientTest.java @@ -19,6 +19,7 @@ import org.tikv.common.codec.KeyUtils; import org.tikv.common.exception.TiKVException; import org.tikv.common.key.Key; +import org.tikv.common.log.SlowLogEmptyImpl; import org.tikv.common.util.BackOffFunction; import org.tikv.common.util.BackOffer; import org.tikv.common.util.ConcreteBackOffer; @@ -187,7 +188,7 @@ public void testCustomBackOff() { public void testDeadlineBackOff() { int timeout = 2000; int sleep = 150; - BackOffer backOffer = ConcreteBackOffer.newDeadlineBackOff(timeout); + BackOffer backOffer = ConcreteBackOffer.newDeadlineBackOff(timeout, SlowLogEmptyImpl.INSTANCE); long s = System.currentTimeMillis(); try { while (true) {