From c786eb8852eb42c40632e06678dfd05cafc176e9 Mon Sep 17 00:00:00 2001 From: marsishandsome Date: Wed, 17 Nov 2021 11:16:30 +0800 Subject: [PATCH 1/5] add slow log Signed-off-by: marsishandsome --- .../org/tikv/common/AbstractGRPCClient.java | 12 +- .../java/org/tikv/common/ConfigUtils.java | 2 + .../java/org/tikv/common/TiConfiguration.java | 10 ++ .../java/org/tikv/common/log/SlowLog.java | 26 +++ .../org/tikv/common/log/SlowLogEmptyImpl.java | 35 ++++ .../java/org/tikv/common/log/SlowLogImpl.java | 83 +++++++++ .../java/org/tikv/common/log/SlowLogSpan.java | 28 +++ .../tikv/common/log/SlowLogSpanEmptyImpl.java | 39 ++++ .../org/tikv/common/log/SlowLogSpanImpl.java | 77 ++++++++ .../org/tikv/common/policy/RetryPolicy.java | 14 +- .../java/org/tikv/common/util/BackOffer.java | 6 + .../tikv/common/util/ConcreteBackOffer.java | 55 ++++-- src/main/java/org/tikv/raw/RawKVClient.java | 170 ++++++++++++++++-- .../java/org/tikv/raw/RawKVClientTest.java | 16 ++ 14 files changed, 540 insertions(+), 33 deletions(-) create mode 100644 src/main/java/org/tikv/common/log/SlowLog.java create mode 100644 src/main/java/org/tikv/common/log/SlowLogEmptyImpl.java create mode 100644 src/main/java/org/tikv/common/log/SlowLogImpl.java create mode 100644 src/main/java/org/tikv/common/log/SlowLogSpan.java create mode 100644 src/main/java/org/tikv/common/log/SlowLogSpanEmptyImpl.java create mode 100644 src/main/java/org/tikv/common/log/SlowLogSpanImpl.java diff --git a/src/main/java/org/tikv/common/AbstractGRPCClient.java b/src/main/java/org/tikv/common/AbstractGRPCClient.java index dd886eb0700..a3c4ed5efff 100644 --- a/src/main/java/org/tikv/common/AbstractGRPCClient.java +++ b/src/main/java/org/tikv/common/AbstractGRPCClient.java @@ -90,7 +90,8 @@ public RespT callWithRetry( return ClientCalls.blockingUnaryCall( stub.getChannel(), method, stub.getCallOptions(), requestFactory.get()); }, - method.getFullMethodName()); + method.getFullMethodName(), + backOffer); if (logger.isTraceEnabled()) { logger.trace(String.format("leaving %s...", method.getFullMethodName())); @@ -118,7 +119,8 @@ protected void callAsyncWithRetry( responseObserver); return null; }, - method.getFullMethodName()); + method.getFullMethodName(), + backOffer); logger.debug(String.format("leaving %s...", method.getFullMethodName())); } @@ -139,7 +141,8 @@ StreamObserver callBidiStreamingWithRetry( return asyncBidiStreamingCall( stub.getChannel().newCall(method, stub.getCallOptions()), responseObserver); }, - method.getFullMethodName()); + method.getFullMethodName(), + backOffer); logger.debug(String.format("leaving %s...", method.getFullMethodName())); return observer; } @@ -162,7 +165,8 @@ public StreamingResponse callServerStreamingWithRetry( blockingServerStreamingCall( stub.getChannel(), method, stub.getCallOptions(), requestFactory.get())); }, - method.getFullMethodName()); + method.getFullMethodName(), + backOffer); logger.debug(String.format("leaving %s...", method.getFullMethodName())); return response; } diff --git a/src/main/java/org/tikv/common/ConfigUtils.java b/src/main/java/org/tikv/common/ConfigUtils.java index 7d0223c5529..bb309ec4c6d 100644 --- a/src/main/java/org/tikv/common/ConfigUtils.java +++ b/src/main/java/org/tikv/common/ConfigUtils.java @@ -67,6 +67,7 @@ public class ConfigUtils { public static final String TIKV_RAWKV_SCAN_TIMEOUT_IN_MS = "tikv.rawkv.scan_timeout_in_ms"; public static final String TIKV_RAWKV_CLEAN_TIMEOUT_IN_MS = "tikv.rawkv.clean_timeout_in_ms"; public static final String TIKV_BO_REGION_MISS_BASE_IN_MS = "tikv.bo_region_miss_base_in_ms"; + public static final String TIKV_SLOW_LOG_THRESHOLD = "tikv.slow_log_threshold"; public static final String DEF_PD_ADDRESSES = "127.0.0.1:2379"; public static final String DEF_TIMEOUT = "200ms"; @@ -110,6 +111,7 @@ public class ConfigUtils { public static final int DEF_TIKV_RAWKV_CLEAN_TIMEOUT_IN_MS = 600000; public static final int DEF_TIKV_BO_REGION_MISS_BASE_IN_MS = 20; + public static final String DEF_TIKV_SLOW_LOG_THRESHOLD = "0.5"; // TODO: default value? public static final String NORMAL_COMMAND_PRIORITY = "NORMAL"; public static final String LOW_COMMAND_PRIORITY = "LOW"; diff --git a/src/main/java/org/tikv/common/TiConfiguration.java b/src/main/java/org/tikv/common/TiConfiguration.java index 614cfbdeaa3..30748e3b85c 100644 --- a/src/main/java/org/tikv/common/TiConfiguration.java +++ b/src/main/java/org/tikv/common/TiConfiguration.java @@ -119,6 +119,7 @@ private static void loadFromDefaultProperties() { setIfMissing(TIKV_RAWKV_SCAN_TIMEOUT_IN_MS, DEF_TIKV_RAWKV_SCAN_TIMEOUT_IN_MS); setIfMissing(TIKV_RAWKV_CLEAN_TIMEOUT_IN_MS, DEF_TIKV_RAWKV_CLEAN_TIMEOUT_IN_MS); setIfMissing(TIKV_BO_REGION_MISS_BASE_IN_MS, DEF_TIKV_BO_REGION_MISS_BASE_IN_MS); + setIfMissing(TIKV_SLOW_LOG_THRESHOLD, DEF_TIKV_SLOW_LOG_THRESHOLD); } public static void listAll() { @@ -315,6 +316,7 @@ private static ReplicaRead getReplicaRead(String key) { private int rawKVBatchWriteTimeoutInMS = getInt(TIKV_RAWKV_BATCH_WRITE_TIMEOUT_IN_MS); private int rawKVScanTimeoutInMS = getInt(TIKV_RAWKV_SCAN_TIMEOUT_IN_MS); private int rawKVCleanTimeoutInMS = getInt(TIKV_RAWKV_CLEAN_TIMEOUT_IN_MS); + private double slowLogThreshold = getDouble(TIKV_SLOW_LOG_THRESHOLD); public enum KVMode { TXN, @@ -677,4 +679,12 @@ public int getRawKVCleanTimeoutInMS() { public void setRawKVCleanTimeoutInMS(int rawKVCleanTimeoutInMS) { this.rawKVCleanTimeoutInMS = rawKVCleanTimeoutInMS; } + + public double getSlowLogThreshold() { + return slowLogThreshold; + } + + public void setSlowLogThreshold(double slowLogThreshold) { + this.slowLogThreshold = slowLogThreshold; + } } diff --git a/src/main/java/org/tikv/common/log/SlowLog.java b/src/main/java/org/tikv/common/log/SlowLog.java new file mode 100644 index 00000000000..b4daea9f372 --- /dev/null +++ b/src/main/java/org/tikv/common/log/SlowLog.java @@ -0,0 +1,26 @@ +/* + * + * Copyright 2021 PingCAP, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.tikv.common.log; + +public interface SlowLog { + void addProperty(String key, String value); + + SlowLogSpan start(String name); + + void logSlowLog(); +} diff --git a/src/main/java/org/tikv/common/log/SlowLogEmptyImpl.java b/src/main/java/org/tikv/common/log/SlowLogEmptyImpl.java new file mode 100644 index 00000000000..0fb0b11779a --- /dev/null +++ b/src/main/java/org/tikv/common/log/SlowLogEmptyImpl.java @@ -0,0 +1,35 @@ +/* + * + * Copyright 2021 PingCAP, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.tikv.common.log; + +public class SlowLogEmptyImpl implements SlowLog { + public static final SlowLogEmptyImpl INSTANCE = new SlowLogEmptyImpl(); + + private SlowLogEmptyImpl() {} + + @Override + public void addProperty(String key, String value) {} + + @Override + public SlowLogSpan start(String name) { + return SlowLogSpanEmptyImpl.INSTANCE; + } + + @Override + public void logSlowLog() {} +} diff --git a/src/main/java/org/tikv/common/log/SlowLogImpl.java b/src/main/java/org/tikv/common/log/SlowLogImpl.java new file mode 100644 index 00000000000..ce8d4246773 --- /dev/null +++ b/src/main/java/org/tikv/common/log/SlowLogImpl.java @@ -0,0 +1,83 @@ +/* + * + * Copyright 2021 PingCAP, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.tikv.common.log; + +import com.google.gson.JsonArray; +import com.google.gson.JsonObject; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SlowLogImpl implements SlowLog { + private static final Logger logger = LoggerFactory.getLogger(SlowLogImpl.class); + + public static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("HH:mm:ss.SSS"); + + private final List slowLogSpans = new ArrayList<>(); + + private final long startMS; + private final Map properties; + + public SlowLogImpl(Map properties) { + this.startMS = System.currentTimeMillis(); + this.properties = new HashMap<>(properties); + } + + @Override + public void addProperty(String key, String value) { + this.properties.put(key, value); + } + + @Override + public synchronized SlowLogSpan start(String name) { + SlowLogSpan slowLogSpan = new SlowLogSpanImpl(name); + slowLogSpans.add(slowLogSpan); + slowLogSpan.start(); + return slowLogSpan; + } + + @Override + public void logSlowLog() { + logger.warn(getSlowLogString()); + } + + private String getSlowLogString() { + JsonObject jsonObject = new JsonObject(); + long currentMS = System.currentTimeMillis(); + + jsonObject.addProperty("start", DATE_FORMAT.format(startMS)); + jsonObject.addProperty("end", DATE_FORMAT.format(currentMS)); + jsonObject.addProperty("duration", (currentMS - startMS) + "ms"); + + for (Map.Entry entry : properties.entrySet()) { + jsonObject.addProperty(entry.getKey(), entry.getValue()); + } + + JsonArray jsonArray = new JsonArray(); + for (SlowLogSpan slowLogSpan : slowLogSpans) { + jsonArray.add(slowLogSpan.toJsonElement()); + } + jsonObject.add("spans", jsonArray); + + return jsonObject.toString(); + } +} diff --git a/src/main/java/org/tikv/common/log/SlowLogSpan.java b/src/main/java/org/tikv/common/log/SlowLogSpan.java new file mode 100644 index 00000000000..a56e9df1885 --- /dev/null +++ b/src/main/java/org/tikv/common/log/SlowLogSpan.java @@ -0,0 +1,28 @@ +/* + * + * Copyright 2021 PingCAP, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.tikv.common.log; + +import com.google.gson.JsonElement; + +public interface SlowLogSpan { + void start(); + + void end(); + + JsonElement toJsonElement(); +} diff --git a/src/main/java/org/tikv/common/log/SlowLogSpanEmptyImpl.java b/src/main/java/org/tikv/common/log/SlowLogSpanEmptyImpl.java new file mode 100644 index 00000000000..11231d1c685 --- /dev/null +++ b/src/main/java/org/tikv/common/log/SlowLogSpanEmptyImpl.java @@ -0,0 +1,39 @@ +/* + * + * Copyright 2021 PingCAP, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.tikv.common.log; + +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; + +public class SlowLogSpanEmptyImpl implements SlowLogSpan { + + public static final SlowLogSpanEmptyImpl INSTANCE = new SlowLogSpanEmptyImpl(); + + private SlowLogSpanEmptyImpl() {} + + @Override + public void start() {} + + @Override + public void end() {} + + @Override + public JsonElement toJsonElement() { + return new JsonObject(); + } +} diff --git a/src/main/java/org/tikv/common/log/SlowLogSpanImpl.java b/src/main/java/org/tikv/common/log/SlowLogSpanImpl.java new file mode 100644 index 00000000000..960baeb41dc --- /dev/null +++ b/src/main/java/org/tikv/common/log/SlowLogSpanImpl.java @@ -0,0 +1,77 @@ +/* + * + * Copyright 2021 PingCAP, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.tikv.common.log; + +import static org.tikv.common.log.SlowLogImpl.DATE_FORMAT; + +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; + +public class SlowLogSpanImpl implements SlowLogSpan { + private final String name; + private long startMS; + private long endMS; + + public SlowLogSpanImpl(String name) { + this.name = name; + this.startMS = 0; + this.endMS = 0; + } + + @Override + public void start() { + this.startMS = System.currentTimeMillis(); + } + + @Override + public void end() { + this.endMS = System.currentTimeMillis(); + } + + @Override + public JsonElement toJsonElement() { + JsonObject jsonObject = new JsonObject(); + jsonObject.addProperty("name", name); + jsonObject.addProperty("start", getStartString()); + jsonObject.addProperty("end", getEndString()); + jsonObject.addProperty("duration", getDurationString()); + + return jsonObject; + } + + private String getStartString() { + if (startMS == 0) { + return "N/A"; + } + return DATE_FORMAT.format(startMS); + } + + private String getEndString() { + if (endMS == 0) { + return "N/A"; + } + return DATE_FORMAT.format(endMS); + } + + private String getDurationString() { + if (startMS == 0 || endMS == 0) { + return "N/A"; + } + return (endMS - startMS) + "ms"; + } +} diff --git a/src/main/java/org/tikv/common/policy/RetryPolicy.java b/src/main/java/org/tikv/common/policy/RetryPolicy.java index 73cb9dc0e93..300fa594240 100644 --- a/src/main/java/org/tikv/common/policy/RetryPolicy.java +++ b/src/main/java/org/tikv/common/policy/RetryPolicy.java @@ -21,6 +21,7 @@ import io.prometheus.client.Histogram; import java.util.concurrent.Callable; import org.tikv.common.exception.GrpcException; +import org.tikv.common.log.SlowLogSpan; import org.tikv.common.operation.ErrorHandler; import org.tikv.common.util.BackOffer; import org.tikv.common.util.ConcreteBackOffer; @@ -67,7 +68,16 @@ private void rethrowNotRecoverableException(Exception e) { } } - public RespT callWithRetry(Callable proc, String methodName) { + public RespT callWithRetry(Callable proc, String methodName, BackOffer backOffer) { + SlowLogSpan slowLogSpan = backOffer.slowLogStart("callWithRetry " + methodName); + try { + return callWithRetry(proc, methodName); + } finally { + slowLogSpan.end(); + } + } + + private RespT callWithRetry(Callable proc, String methodName) { Histogram.Timer callWithRetryTimer = CALL_WITH_RETRY_DURATION.labels(methodName).startTimer(); try { while (true) { @@ -76,9 +86,11 @@ public RespT callWithRetry(Callable proc, String methodName) { // add single request duration histogram Histogram.Timer requestTimer = GRPC_SINGLE_REQUEST_LATENCY.labels(methodName).startTimer(); + SlowLogSpan slowLogSpan = backOffer.slowLogStart("gRPC " + methodName); try { result = proc.call(); } finally { + slowLogSpan.end(); requestTimer.observeDuration(); } } catch (Exception e) { diff --git a/src/main/java/org/tikv/common/util/BackOffer.java b/src/main/java/org/tikv/common/util/BackOffer.java index b5081a39b5c..2a525b923d2 100644 --- a/src/main/java/org/tikv/common/util/BackOffer.java +++ b/src/main/java/org/tikv/common/util/BackOffer.java @@ -17,6 +17,8 @@ package org.tikv.common.util; +import org.tikv.common.log.SlowLogSpan; + public interface BackOffer { // Back off types. int seconds = 1000; @@ -58,4 +60,8 @@ enum BackOffStrategy { // DecorrJitter increases the maximum jitter based on the last random value. DecorrJitter } + + SlowLogSpan slowLogStart(String span); + + void logSlowLog(); } diff --git a/src/main/java/org/tikv/common/util/ConcreteBackOffer.java b/src/main/java/org/tikv/common/util/ConcreteBackOffer.java index 141bb5ed76e..648626cc767 100644 --- a/src/main/java/org/tikv/common/util/ConcreteBackOffer.java +++ b/src/main/java/org/tikv/common/util/ConcreteBackOffer.java @@ -29,6 +29,9 @@ import org.slf4j.LoggerFactory; import org.tikv.common.TiConfiguration; import org.tikv.common.exception.GrpcException; +import org.tikv.common.log.SlowLog; +import org.tikv.common.log.SlowLogEmptyImpl; +import org.tikv.common.log.SlowLogSpan; public class ConcreteBackOffer implements BackOffer { private static final Logger logger = LoggerFactory.getLogger(ConcreteBackOffer.class); @@ -36,7 +39,10 @@ public class ConcreteBackOffer implements BackOffer { private final Map backOffFunctionMap; private final List errors; private int totalSleep; - private final long deadline; + public final long startMS; + public final long timeoutInMs; + public final long deadline; + private final SlowLog slowLog; public static final Histogram BACKOFF_DURATION = Histogram.build() @@ -45,7 +51,8 @@ public class ConcreteBackOffer implements BackOffer { .labelNames("type") .register(); - private ConcreteBackOffer(int maxSleep, long deadline) { + private ConcreteBackOffer( + int maxSleep, long startMS, long timeoutInMs, long deadline, SlowLog slowLog) { Preconditions.checkArgument( maxSleep == 0 || deadline == 0, "Max sleep time should be 0 or Deadline should be 0."); Preconditions.checkArgument(maxSleep >= 0, "Max sleep time cannot be less than 0."); @@ -53,7 +60,10 @@ private ConcreteBackOffer(int maxSleep, long deadline) { this.maxSleep = maxSleep; this.errors = new ArrayList<>(); this.backOffFunctionMap = new HashMap<>(); + this.startMS = startMS; + this.timeoutInMs = timeoutInMs; this.deadline = deadline; + this.slowLog = slowLog; } private ConcreteBackOffer(ConcreteBackOffer source) { @@ -61,40 +71,48 @@ private ConcreteBackOffer(ConcreteBackOffer source) { this.totalSleep = source.totalSleep; this.errors = source.errors; this.backOffFunctionMap = source.backOffFunctionMap; + this.startMS = source.startMS; + this.timeoutInMs = source.timeoutInMs; this.deadline = source.deadline; + this.slowLog = source.slowLog; } public static ConcreteBackOffer newDeadlineBackOff(int timeoutInMs) { - long deadline = System.currentTimeMillis() + timeoutInMs; - return new ConcreteBackOffer(0, deadline); + return newDeadlineBackOff(timeoutInMs, SlowLogEmptyImpl.INSTANCE); + } + + public static ConcreteBackOffer newDeadlineBackOff(int timeoutInMs, SlowLog slowLog) { + long startMS = System.currentTimeMillis(); + long deadline = startMS + timeoutInMs; + return new ConcreteBackOffer(0, startMS, timeoutInMs, deadline, slowLog); } public static ConcreteBackOffer newCustomBackOff(int maxSleep) { - return new ConcreteBackOffer(maxSleep, 0); + return new ConcreteBackOffer(maxSleep, 0, 0, 0, SlowLogEmptyImpl.INSTANCE); } public static ConcreteBackOffer newScannerNextMaxBackOff() { - return new ConcreteBackOffer(SCANNER_NEXT_MAX_BACKOFF, 0); + return new ConcreteBackOffer(SCANNER_NEXT_MAX_BACKOFF, 0, 0, 0, SlowLogEmptyImpl.INSTANCE); } public static ConcreteBackOffer newBatchGetMaxBackOff() { - return new ConcreteBackOffer(BATCH_GET_MAX_BACKOFF, 0); + return new ConcreteBackOffer(BATCH_GET_MAX_BACKOFF, 0, 0, 0, SlowLogEmptyImpl.INSTANCE); } public static ConcreteBackOffer newCopNextMaxBackOff() { - return new ConcreteBackOffer(COP_NEXT_MAX_BACKOFF, 0); + return new ConcreteBackOffer(COP_NEXT_MAX_BACKOFF, 0, 0, 0, SlowLogEmptyImpl.INSTANCE); } public static ConcreteBackOffer newGetBackOff() { - return new ConcreteBackOffer(GET_MAX_BACKOFF, 0); + return new ConcreteBackOffer(GET_MAX_BACKOFF, 0, 0, 0, SlowLogEmptyImpl.INSTANCE); } public static ConcreteBackOffer newRawKVBackOff() { - return new ConcreteBackOffer(RAWKV_MAX_BACKOFF, 0); + return new ConcreteBackOffer(RAWKV_MAX_BACKOFF, 0, 0, 0, SlowLogEmptyImpl.INSTANCE); } public static ConcreteBackOffer newTsoBackOff() { - return new ConcreteBackOffer(TSO_MAX_BACKOFF, 0); + return new ConcreteBackOffer(TSO_MAX_BACKOFF, 0, 0, 0, SlowLogEmptyImpl.INSTANCE); } public static ConcreteBackOffer create(BackOffer source) { @@ -151,6 +169,7 @@ public boolean canRetryAfterSleep(BackOffFunction.BackOffFuncType funcType) { } public boolean canRetryAfterSleep(BackOffFunction.BackOffFuncType funcType, long maxSleepMs) { + SlowLogSpan slowLogSpan = slowLogStart("backoff " + funcType.name()); Histogram.Timer backOffTimer = BACKOFF_DURATION.labels(funcType.name()).startTimer(); BackOffFunction backOffFunction = backOffFunctionMap.computeIfAbsent(funcType, this::createBackOffFunc); @@ -171,8 +190,10 @@ public boolean canRetryAfterSleep(BackOffFunction.BackOffFuncType funcType, long Thread.sleep(sleep); } catch (InterruptedException e) { throw new GrpcException(e); + } finally { + slowLogSpan.end(); + backOffTimer.observeDuration(); } - backOffTimer.observeDuration(); if (maxSleep > 0 && totalSleep >= maxSleep) { logger.warn(String.format("BackOffer.maxSleep %dms is exceeded, errors:", maxSleep)); return false; @@ -206,4 +227,14 @@ private void logThrowError(Exception err) { // Use the last backoff type to generate an exception throw new GrpcException("retry is exhausted.", err); } + + @Override + public SlowLogSpan slowLogStart(String span) { + return slowLog.start(span); + } + + @Override + public void logSlowLog() { + slowLog.logSlowLog(); + } } diff --git a/src/main/java/org/tikv/raw/RawKVClient.java b/src/main/java/org/tikv/raw/RawKVClient.java index fdd96600b96..0acbd372bcb 100644 --- a/src/main/java/org/tikv/raw/RawKVClient.java +++ b/src/main/java/org/tikv/raw/RawKVClient.java @@ -27,8 +27,11 @@ import org.slf4j.LoggerFactory; import org.tikv.common.TiConfiguration; import org.tikv.common.TiSession; +import org.tikv.common.codec.KeyUtils; import org.tikv.common.exception.TiKVException; import org.tikv.common.key.Key; +import org.tikv.common.log.SlowLog; +import org.tikv.common.log.SlowLogImpl; import org.tikv.common.operation.iterator.RawScanIterator; import org.tikv.common.region.RegionStoreClient; import org.tikv.common.region.RegionStoreClient.RegionStoreClientBuilder; @@ -129,10 +132,20 @@ public void putAtomic(ByteString key, ByteString value, long ttl) { private void put(ByteString key, ByteString value, long ttl, boolean atomic) { String label = "client_raw_put"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); + SlowLog slowLog = + new SlowLogImpl( + new HashMap(8) { + { + put("func", "put"); + put("key", KeyUtils.formatBytesUTF8(key)); + } + }); + ConcreteBackOffer backOffer = + ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVWriteTimeoutInMS(), slowLog); try { - BackOffer backOffer = ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVWriteTimeoutInMS()); while (true) { RegionStoreClient client = clientBuilder.build(key, backOffer); + slowLog.addProperty("region", client.getRegion().toString()); try { client.rawPut(backOffer, key, value, ttl, atomic); RAW_REQUEST_SUCCESS.labels(label).inc(); @@ -147,6 +160,7 @@ private void put(ByteString key, ByteString value, long ttl, boolean atomic) { throw e; } finally { requestTimer.observeDuration(); + logSlowLog(backOffer); } } @@ -174,10 +188,20 @@ public ByteString putIfAbsent(ByteString key, ByteString value) { public ByteString putIfAbsent(ByteString key, ByteString value, long ttl) { String label = "client_raw_put_if_absent"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); + SlowLog slowLog = + new SlowLogImpl( + new HashMap(8) { + { + put("func", "putIfAbsent"); + put("key", KeyUtils.formatBytesUTF8(key)); + } + }); + ConcreteBackOffer backOffer = + ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVWriteTimeoutInMS(), slowLog); try { - BackOffer backOffer = ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVWriteTimeoutInMS()); while (true) { RegionStoreClient client = clientBuilder.build(key, backOffer); + slowLog.addProperty("region", client.getRegion().toString()); try { ByteString result = client.rawPutIfAbsent(backOffer, key, value, ttl); RAW_REQUEST_SUCCESS.labels(label).inc(); @@ -192,6 +216,7 @@ public ByteString putIfAbsent(ByteString key, ByteString value, long ttl) { throw e; } finally { requestTimer.observeDuration(); + logSlowLog(backOffer); } } @@ -236,9 +261,17 @@ public void batchPutAtomic(Map kvPairs, long ttl) { private void batchPut(Map kvPairs, long ttl, boolean atomic) { String label = "client_raw_batch_put"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); + SlowLog slowLog = + new SlowLogImpl( + new HashMap(8) { + { + put("func", "batchPut"); + put("keySize", String.valueOf(kvPairs.size())); + } + }); + ConcreteBackOffer backOffer = + ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVBatchWriteTimeoutInMS(), slowLog); try { - BackOffer backOffer = - ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVBatchWriteTimeoutInMS()); long deadline = System.currentTimeMillis() + conf.getRawKVBatchWriteTimeoutInMS(); doSendBatchPut(backOffer, kvPairs, ttl, atomic, deadline); RAW_REQUEST_SUCCESS.labels(label).inc(); @@ -247,6 +280,7 @@ private void batchPut(Map kvPairs, long ttl, boolean ato throw e; } finally { requestTimer.observeDuration(); + logSlowLog(backOffer); } } @@ -259,10 +293,21 @@ private void batchPut(Map kvPairs, long ttl, boolean ato public ByteString get(ByteString key) { String label = "client_raw_get"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); + SlowLog slowLog = + new SlowLogImpl( + new HashMap(8) { + { + put("func", "get"); + put("key", KeyUtils.formatBytesUTF8(key)); + } + }); + + ConcreteBackOffer backOffer = + ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVReadTimeoutInMS(), slowLog); try { - BackOffer backOffer = ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVReadTimeoutInMS()); while (true) { RegionStoreClient client = clientBuilder.build(key, backOffer); + slowLog.addProperty("region", client.getRegion().toString()); try { ByteString result = client.rawGet(backOffer, key); RAW_REQUEST_SUCCESS.labels(label).inc(); @@ -277,6 +322,7 @@ public ByteString get(ByteString key) { throw e; } finally { requestTimer.observeDuration(); + logSlowLog(backOffer); } } @@ -289,9 +335,17 @@ public ByteString get(ByteString key) { public List batchGet(List keys) { String label = "client_raw_batch_get"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); + SlowLog slowLog = + new SlowLogImpl( + new HashMap(8) { + { + put("func", "batchGet"); + put("keySize", String.valueOf(keys.size())); + } + }); + ConcreteBackOffer backOffer = + ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVBatchReadTimeoutInMS(), slowLog); try { - BackOffer backOffer = - ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVBatchReadTimeoutInMS()); long deadline = System.currentTimeMillis() + conf.getRawKVBatchReadTimeoutInMS(); List result = doSendBatchGet(backOffer, keys, deadline); RAW_REQUEST_SUCCESS.labels(label).inc(); @@ -301,6 +355,7 @@ public List batchGet(List keys) { throw e; } finally { requestTimer.observeDuration(); + logSlowLog(backOffer); } } @@ -325,9 +380,17 @@ public void batchDeleteAtomic(List keys) { private void batchDelete(List keys, boolean atomic) { String label = "client_raw_batch_delete"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); + SlowLog slowLog = + new SlowLogImpl( + new HashMap(8) { + { + put("func", "batchDelete"); + put("keySize", String.valueOf(keys.size())); + } + }); + ConcreteBackOffer backOffer = + ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVBatchWriteTimeoutInMS(), slowLog); try { - BackOffer backOffer = - ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVBatchWriteTimeoutInMS()); long deadline = System.currentTimeMillis() + conf.getRawKVBatchWriteTimeoutInMS(); doSendBatchDelete(backOffer, keys, atomic, deadline); RAW_REQUEST_SUCCESS.labels(label).inc(); @@ -337,6 +400,7 @@ private void batchDelete(List keys, boolean atomic) { throw e; } finally { requestTimer.observeDuration(); + logSlowLog(backOffer); } } @@ -350,10 +414,20 @@ private void batchDelete(List keys, boolean atomic) { public Long getKeyTTL(ByteString key) { String label = "client_raw_get_key_ttl"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); + SlowLog slowLog = + new SlowLogImpl( + new HashMap(8) { + { + put("func", "getKeyTTL"); + put("key", KeyUtils.formatBytesUTF8(key)); + } + }); + ConcreteBackOffer backOffer = + ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVReadTimeoutInMS(), slowLog); try { - BackOffer backOffer = ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVReadTimeoutInMS()); while (true) { RegionStoreClient client = clientBuilder.build(key, backOffer); + slowLog.addProperty("region", client.getRegion().toString()); try { Long result = client.rawGetKeyTTL(backOffer, key); RAW_REQUEST_SUCCESS.labels(label).inc(); @@ -368,6 +442,7 @@ public Long getKeyTTL(ByteString key) { throw e; } finally { requestTimer.observeDuration(); + logSlowLog(backOffer); } } @@ -445,8 +520,20 @@ public List scan(ByteString startKey, ByteString endKey, int limit) { public List scan(ByteString startKey, ByteString endKey, int limit, boolean keyOnly) { String label = "client_raw_scan"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); + SlowLog slowLog = + new SlowLogImpl( + new HashMap(8) { + { + put("func", "scan"); + put("startKey", KeyUtils.formatBytesUTF8(startKey)); + put("endKey", KeyUtils.formatBytesUTF8(endKey)); + put("limit", String.valueOf(limit)); + put("keyOnly", String.valueOf(keyOnly)); + } + }); + ConcreteBackOffer backOffer = + ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVScanTimeoutInMS(), slowLog); try { - BackOffer backOffer = ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVScanTimeoutInMS()); Iterator iterator = rawScanIterator(conf, clientBuilder, startKey, endKey, limit, keyOnly, backOffer); List result = new ArrayList<>(); @@ -458,6 +545,7 @@ public List scan(ByteString startKey, ByteString endKey, int limit, bool throw e; } finally { requestTimer.observeDuration(); + logSlowLog(backOffer); } } @@ -506,18 +594,36 @@ public List scan(ByteString startKey, ByteString endKey) { public List scan(ByteString startKey, ByteString endKey, boolean keyOnly) { String label = "client_raw_scan_without_limit"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); + SlowLog slowLog = + new SlowLogImpl( + new HashMap(8) { + { + put("func", "scan"); + put("startKey", KeyUtils.formatBytesUTF8(startKey)); + put("endKey", KeyUtils.formatBytesUTF8(endKey)); + put("keyOnly", String.valueOf(keyOnly)); + } + }); + ConcreteBackOffer backOffer = + ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVScanTimeoutInMS(), slowLog); try { - BackOffer backOffer = ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVScanTimeoutInMS()); + ByteString newStartKey = startKey; List result = new ArrayList<>(); while (true) { Iterator iterator = rawScanIterator( - conf, clientBuilder, startKey, endKey, conf.getScanBatchSize(), keyOnly, backOffer); + conf, + clientBuilder, + newStartKey, + endKey, + conf.getScanBatchSize(), + keyOnly, + backOffer); if (!iterator.hasNext()) { break; } iterator.forEachRemaining(result::add); - startKey = Key.toRawKey(result.get(result.size() - 1).getKey()).next().toByteString(); + newStartKey = Key.toRawKey(result.get(result.size() - 1).getKey()).next().toByteString(); } RAW_REQUEST_SUCCESS.labels(label).inc(); return result; @@ -526,6 +632,7 @@ public List scan(ByteString startKey, ByteString endKey, boolean keyOnly throw e; } finally { requestTimer.observeDuration(); + logSlowLog(backOffer); } } @@ -578,10 +685,21 @@ public void deleteAtomic(ByteString key) { private void delete(ByteString key, boolean atomic) { String label = "client_raw_delete"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); + SlowLog slowLog = + new SlowLogImpl( + new HashMap(8) { + { + put("func", "delete"); + put("key", KeyUtils.formatBytesUTF8(key)); + put("atomic", String.valueOf(atomic)); + } + }); + ConcreteBackOffer backOffer = + ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVWriteTimeoutInMS(), slowLog); try { - BackOffer backOffer = ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVWriteTimeoutInMS()); while (true) { RegionStoreClient client = clientBuilder.build(key, backOffer); + slowLog.addProperty("region", client.getRegion().toString()); try { client.rawDelete(backOffer, key, atomic); RAW_REQUEST_SUCCESS.labels(label).inc(); @@ -596,6 +714,7 @@ private void delete(ByteString key, boolean atomic) { throw e; } finally { requestTimer.observeDuration(); + logSlowLog(backOffer); } } @@ -611,8 +730,18 @@ private void delete(ByteString key, boolean atomic) { public synchronized void deleteRange(ByteString startKey, ByteString endKey) { String label = "client_raw_delete_range"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); + SlowLog slowLog = + new SlowLogImpl( + new HashMap(8) { + { + put("func", "deleteRange"); + put("startKey", KeyUtils.formatBytesUTF8(startKey)); + put("endKey", KeyUtils.formatBytesUTF8(endKey)); + } + }); + ConcreteBackOffer backOffer = + ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVCleanTimeoutInMS(), slowLog); try { - BackOffer backOffer = ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVCleanTimeoutInMS()); long deadline = System.currentTimeMillis() + conf.getRawKVCleanTimeoutInMS(); doSendDeleteRange(backOffer, startKey, endKey, deadline); RAW_REQUEST_SUCCESS.labels(label).inc(); @@ -621,6 +750,7 @@ public synchronized void deleteRange(ByteString startKey, ByteString endKey) { throw e; } finally { requestTimer.observeDuration(); + logSlowLog(backOffer); } } @@ -936,4 +1066,12 @@ private Iterator rawScanIterator( } return new RawScanIterator(conf, builder, startKey, endKey, limit, keyOnly, backOffer); } + + private void logSlowLog(ConcreteBackOffer backOffer) { + double slowLogThreshold = conf.getSlowLogThreshold(); + long currentMS = System.currentTimeMillis(); + if ((currentMS - backOffer.startMS) >= slowLogThreshold * backOffer.timeoutInMs) { + backOffer.logSlowLog(); + } + } } diff --git a/src/test/java/org/tikv/raw/RawKVClientTest.java b/src/test/java/org/tikv/raw/RawKVClientTest.java index 26fa171e471..9f69f1b066a 100644 --- a/src/test/java/org/tikv/raw/RawKVClientTest.java +++ b/src/test/java/org/tikv/raw/RawKVClientTest.java @@ -298,6 +298,22 @@ public void deleteRangeTest() { client.deleteRange(ByteString.EMPTY, ByteString.EMPTY); } + /* + iptables -I INPUT -s 192.168.190.70 -p tcp --dport 22366 -j DROP + */ + @Test + public void getTest() { + ByteString key = rawKey("key"); + client.get(key); + } + + @Test + public void batchGetTest() { + // TODO: mars + ByteString key = rawKey("key"); + client.get(key); + } + @Test public void simpleTest() { if (!initialized) return; From 17c878a1c8d551d5e968e4c9e094b9b6ebb30cb3 Mon Sep 17 00:00:00 2001 From: marsishandsome Date: Fri, 19 Nov 2021 09:55:48 +0800 Subject: [PATCH 2/5] support SwitchLeader and gRPC Forward Signed-off-by: marsishandsome --- .../java/org/tikv/common/log/SlowLog.java | 2 +- .../org/tikv/common/log/SlowLogEmptyImpl.java | 2 +- .../java/org/tikv/common/log/SlowLogImpl.java | 17 +++- .../common/operation/RegionErrorHandler.java | 2 +- .../org/tikv/common/policy/RetryPolicy.java | 14 +-- .../region/AbstractRegionStoreClient.java | 19 ++-- .../common/region/RegionErrorReceiver.java | 4 +- .../java/org/tikv/common/util/BackOffer.java | 6 +- .../tikv/common/util/ConcreteBackOffer.java | 45 +++------ src/main/java/org/tikv/raw/RawKVClient.java | 92 +++++++++---------- .../java/org/tikv/raw/RawKVClientTest.java | 18 +++- 11 files changed, 110 insertions(+), 111 deletions(-) diff --git a/src/main/java/org/tikv/common/log/SlowLog.java b/src/main/java/org/tikv/common/log/SlowLog.java index b4daea9f372..e172a003462 100644 --- a/src/main/java/org/tikv/common/log/SlowLog.java +++ b/src/main/java/org/tikv/common/log/SlowLog.java @@ -22,5 +22,5 @@ public interface SlowLog { SlowLogSpan start(String name); - void logSlowLog(); + void log(); } diff --git a/src/main/java/org/tikv/common/log/SlowLogEmptyImpl.java b/src/main/java/org/tikv/common/log/SlowLogEmptyImpl.java index 0fb0b11779a..37e68237db5 100644 --- a/src/main/java/org/tikv/common/log/SlowLogEmptyImpl.java +++ b/src/main/java/org/tikv/common/log/SlowLogEmptyImpl.java @@ -31,5 +31,5 @@ public SlowLogSpan start(String name) { } @Override - public void logSlowLog() {} + public void log() {} } diff --git a/src/main/java/org/tikv/common/log/SlowLogImpl.java b/src/main/java/org/tikv/common/log/SlowLogImpl.java index ce8d4246773..7286f664400 100644 --- a/src/main/java/org/tikv/common/log/SlowLogImpl.java +++ b/src/main/java/org/tikv/common/log/SlowLogImpl.java @@ -35,10 +35,14 @@ public class SlowLogImpl implements SlowLog { private final List slowLogSpans = new ArrayList<>(); private final long startMS; + private final long timeoutMS; + + /** Key-Value pairs which will be logged, e.g. function name, key, region, etc. */ private final Map properties; - public SlowLogImpl(Map properties) { + public SlowLogImpl(long timeoutMS, Map properties) { this.startMS = System.currentTimeMillis(); + this.timeoutMS = timeoutMS; this.properties = new HashMap<>(properties); } @@ -56,17 +60,20 @@ public synchronized SlowLogSpan start(String name) { } @Override - public void logSlowLog() { - logger.warn(getSlowLogString()); + public void log() { + long currentMS = System.currentTimeMillis(); + if (currentMS - startMS > timeoutMS) { + logger.warn("SlowLog:" + getSlowLogString(currentMS)); + } } - private String getSlowLogString() { + private String getSlowLogString(long currentMS) { JsonObject jsonObject = new JsonObject(); - long currentMS = System.currentTimeMillis(); jsonObject.addProperty("start", DATE_FORMAT.format(startMS)); jsonObject.addProperty("end", DATE_FORMAT.format(currentMS)); jsonObject.addProperty("duration", (currentMS - startMS) + "ms"); + jsonObject.addProperty("timeout", timeoutMS + "ms"); for (Map.Entry entry : properties.entrySet()) { jsonObject.addProperty(entry.getKey(), entry.getValue()); diff --git a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java index 230ceedaa34..f88772d255b 100644 --- a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java +++ b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java @@ -217,7 +217,7 @@ private boolean onRegionEpochNotMatch(BackOffer backOffer, List c @Override public boolean handleRequestError(BackOffer backOffer, Exception e) { - if (recv.onStoreUnreachable()) { + if (recv.onStoreUnreachable(backOffer.getSlowLog())) { if (!backOffer.canRetryAfterSleep(BackOffFunction.BackOffFuncType.BoTiKVRPC)) { regionManager.onRequestFail(recv.getRegion()); throw new GrpcException("retry is exhausted.", e); diff --git a/src/main/java/org/tikv/common/policy/RetryPolicy.java b/src/main/java/org/tikv/common/policy/RetryPolicy.java index 300fa594240..f27c1eaaac1 100644 --- a/src/main/java/org/tikv/common/policy/RetryPolicy.java +++ b/src/main/java/org/tikv/common/policy/RetryPolicy.java @@ -69,16 +69,9 @@ private void rethrowNotRecoverableException(Exception e) { } public RespT callWithRetry(Callable proc, String methodName, BackOffer backOffer) { - SlowLogSpan slowLogSpan = backOffer.slowLogStart("callWithRetry " + methodName); - try { - return callWithRetry(proc, methodName); - } finally { - slowLogSpan.end(); - } - } - - private RespT callWithRetry(Callable proc, String methodName) { Histogram.Timer callWithRetryTimer = CALL_WITH_RETRY_DURATION.labels(methodName).startTimer(); + SlowLogSpan callWithRetrySlowLogSpan = + backOffer.getSlowLog().start("callWithRetry " + methodName); try { while (true) { RespT result = null; @@ -86,7 +79,7 @@ private RespT callWithRetry(Callable proc, String methodName) { // add single request duration histogram Histogram.Timer requestTimer = GRPC_SINGLE_REQUEST_LATENCY.labels(methodName).startTimer(); - SlowLogSpan slowLogSpan = backOffer.slowLogStart("gRPC " + methodName); + SlowLogSpan slowLogSpan = backOffer.getSlowLog().start("gRPC " + methodName); try { result = proc.call(); } finally { @@ -117,6 +110,7 @@ private RespT callWithRetry(Callable proc, String methodName) { } } finally { callWithRetryTimer.observeDuration(); + callWithRetrySlowLogSpan.end(); } } diff --git a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java index 96e620279aa..6a32850e598 100644 --- a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java @@ -35,6 +35,9 @@ import org.tikv.common.AbstractGRPCClient; import org.tikv.common.TiConfiguration; import org.tikv.common.exception.GrpcException; +import org.tikv.common.log.SlowLog; +import org.tikv.common.log.SlowLogEmptyImpl; +import org.tikv.common.log.SlowLogSpan; import org.tikv.common.util.ChannelFactory; import org.tikv.common.util.Pair; import org.tikv.kvproto.Kvrpcpb; @@ -80,7 +83,7 @@ protected AbstractRegionStoreClient( if (this.store.getProxyStore() != null) { this.timeout = conf.getForwardTimeout(); } else if (!this.store.isReachable()) { - onStoreUnreachable(); + onStoreUnreachable(SlowLogEmptyImpl.INSTANCE); } } @@ -130,7 +133,7 @@ public boolean onNotLeader(TiRegion newRegion) { } @Override - public boolean onStoreUnreachable() { + public boolean onStoreUnreachable(SlowLog slowLog) { if (!store.isValid()) { logger.warn(String.format("store [%d] has been invalid", store.getId())); store = regionManager.getStoreById(store.getId()); @@ -148,13 +151,13 @@ public boolean onStoreUnreachable() { } // seek an available leader store to send request - Boolean result = seekLeaderStore(); + Boolean result = seekLeaderStore(slowLog); if (result != null) { return result; } if (conf.getEnableGrpcForward()) { // seek an available proxy store to forward request - return seekProxyStore(); + return seekProxyStore(slowLog); } return false; } @@ -187,8 +190,9 @@ private void updateClientStub() { } } - private Boolean seekLeaderStore() { + private Boolean seekLeaderStore(SlowLog slowLog) { Histogram.Timer switchLeaderDurationTimer = SEEK_LEADER_STORE_DURATION.startTimer(); + SlowLogSpan slowLogSpan = slowLog.start("seekLeaderStore"); try { List peers = region.getFollowerList(); if (peers.isEmpty()) { @@ -237,11 +241,13 @@ private Boolean seekLeaderStore() { } } finally { switchLeaderDurationTimer.observeDuration(); + slowLogSpan.end(); } return null; } - private boolean seekProxyStore() { + private boolean seekProxyStore(SlowLog slowLog) { + SlowLogSpan slowLogSpan = slowLog.start("seekProxyStore"); Histogram.Timer grpcForwardDurationTimer = SEEK_PROXY_STORE_DURATION.startTimer(); try { logger.info(String.format("try grpc forward: region[%d]", region.getId())); @@ -259,6 +265,7 @@ private boolean seekProxyStore() { return true; } finally { grpcForwardDurationTimer.observeDuration(); + slowLogSpan.end(); } } diff --git a/src/main/java/org/tikv/common/region/RegionErrorReceiver.java b/src/main/java/org/tikv/common/region/RegionErrorReceiver.java index 4bee1356eab..307538d4a82 100644 --- a/src/main/java/org/tikv/common/region/RegionErrorReceiver.java +++ b/src/main/java/org/tikv/common/region/RegionErrorReceiver.java @@ -17,11 +17,13 @@ package org.tikv.common.region; +import org.tikv.common.log.SlowLog; + public interface RegionErrorReceiver { boolean onNotLeader(TiRegion region); /// return whether we need to retry this request. - boolean onStoreUnreachable(); + boolean onStoreUnreachable(SlowLog slowLog); TiRegion getRegion(); } diff --git a/src/main/java/org/tikv/common/util/BackOffer.java b/src/main/java/org/tikv/common/util/BackOffer.java index 2a525b923d2..91b3d7c0dfe 100644 --- a/src/main/java/org/tikv/common/util/BackOffer.java +++ b/src/main/java/org/tikv/common/util/BackOffer.java @@ -17,7 +17,7 @@ package org.tikv.common.util; -import org.tikv.common.log.SlowLogSpan; +import org.tikv.common.log.SlowLog; public interface BackOffer { // Back off types. @@ -61,7 +61,5 @@ enum BackOffStrategy { DecorrJitter } - SlowLogSpan slowLogStart(String span); - - void logSlowLog(); + SlowLog getSlowLog(); } diff --git a/src/main/java/org/tikv/common/util/ConcreteBackOffer.java b/src/main/java/org/tikv/common/util/ConcreteBackOffer.java index 648626cc767..89c79c30783 100644 --- a/src/main/java/org/tikv/common/util/ConcreteBackOffer.java +++ b/src/main/java/org/tikv/common/util/ConcreteBackOffer.java @@ -39,9 +39,7 @@ public class ConcreteBackOffer implements BackOffer { private final Map backOffFunctionMap; private final List errors; private int totalSleep; - public final long startMS; - public final long timeoutInMs; - public final long deadline; + private final long deadline; private final SlowLog slowLog; public static final Histogram BACKOFF_DURATION = @@ -51,8 +49,7 @@ public class ConcreteBackOffer implements BackOffer { .labelNames("type") .register(); - private ConcreteBackOffer( - int maxSleep, long startMS, long timeoutInMs, long deadline, SlowLog slowLog) { + private ConcreteBackOffer(int maxSleep, long deadline, SlowLog slowLog) { Preconditions.checkArgument( maxSleep == 0 || deadline == 0, "Max sleep time should be 0 or Deadline should be 0."); Preconditions.checkArgument(maxSleep >= 0, "Max sleep time cannot be less than 0."); @@ -60,8 +57,6 @@ private ConcreteBackOffer( this.maxSleep = maxSleep; this.errors = new ArrayList<>(); this.backOffFunctionMap = new HashMap<>(); - this.startMS = startMS; - this.timeoutInMs = timeoutInMs; this.deadline = deadline; this.slowLog = slowLog; } @@ -71,48 +66,41 @@ private ConcreteBackOffer(ConcreteBackOffer source) { this.totalSleep = source.totalSleep; this.errors = source.errors; this.backOffFunctionMap = source.backOffFunctionMap; - this.startMS = source.startMS; - this.timeoutInMs = source.timeoutInMs; this.deadline = source.deadline; this.slowLog = source.slowLog; } - public static ConcreteBackOffer newDeadlineBackOff(int timeoutInMs) { - return newDeadlineBackOff(timeoutInMs, SlowLogEmptyImpl.INSTANCE); - } - public static ConcreteBackOffer newDeadlineBackOff(int timeoutInMs, SlowLog slowLog) { - long startMS = System.currentTimeMillis(); - long deadline = startMS + timeoutInMs; - return new ConcreteBackOffer(0, startMS, timeoutInMs, deadline, slowLog); + long deadline = System.currentTimeMillis() + timeoutInMs; + return new ConcreteBackOffer(0, deadline, slowLog); } public static ConcreteBackOffer newCustomBackOff(int maxSleep) { - return new ConcreteBackOffer(maxSleep, 0, 0, 0, SlowLogEmptyImpl.INSTANCE); + return new ConcreteBackOffer(maxSleep, 0, SlowLogEmptyImpl.INSTANCE); } public static ConcreteBackOffer newScannerNextMaxBackOff() { - return new ConcreteBackOffer(SCANNER_NEXT_MAX_BACKOFF, 0, 0, 0, SlowLogEmptyImpl.INSTANCE); + return new ConcreteBackOffer(SCANNER_NEXT_MAX_BACKOFF, 0, SlowLogEmptyImpl.INSTANCE); } public static ConcreteBackOffer newBatchGetMaxBackOff() { - return new ConcreteBackOffer(BATCH_GET_MAX_BACKOFF, 0, 0, 0, SlowLogEmptyImpl.INSTANCE); + return new ConcreteBackOffer(BATCH_GET_MAX_BACKOFF, 0, SlowLogEmptyImpl.INSTANCE); } public static ConcreteBackOffer newCopNextMaxBackOff() { - return new ConcreteBackOffer(COP_NEXT_MAX_BACKOFF, 0, 0, 0, SlowLogEmptyImpl.INSTANCE); + return new ConcreteBackOffer(COP_NEXT_MAX_BACKOFF, 0, SlowLogEmptyImpl.INSTANCE); } public static ConcreteBackOffer newGetBackOff() { - return new ConcreteBackOffer(GET_MAX_BACKOFF, 0, 0, 0, SlowLogEmptyImpl.INSTANCE); + return new ConcreteBackOffer(GET_MAX_BACKOFF, 0, SlowLogEmptyImpl.INSTANCE); } public static ConcreteBackOffer newRawKVBackOff() { - return new ConcreteBackOffer(RAWKV_MAX_BACKOFF, 0, 0, 0, SlowLogEmptyImpl.INSTANCE); + return new ConcreteBackOffer(RAWKV_MAX_BACKOFF, 0, SlowLogEmptyImpl.INSTANCE); } public static ConcreteBackOffer newTsoBackOff() { - return new ConcreteBackOffer(TSO_MAX_BACKOFF, 0, 0, 0, SlowLogEmptyImpl.INSTANCE); + return new ConcreteBackOffer(TSO_MAX_BACKOFF, 0, SlowLogEmptyImpl.INSTANCE); } public static ConcreteBackOffer create(BackOffer source) { @@ -169,7 +157,7 @@ public boolean canRetryAfterSleep(BackOffFunction.BackOffFuncType funcType) { } public boolean canRetryAfterSleep(BackOffFunction.BackOffFuncType funcType, long maxSleepMs) { - SlowLogSpan slowLogSpan = slowLogStart("backoff " + funcType.name()); + SlowLogSpan slowLogSpan = getSlowLog().start("backoff " + funcType.name()); Histogram.Timer backOffTimer = BACKOFF_DURATION.labels(funcType.name()).startTimer(); BackOffFunction backOffFunction = backOffFunctionMap.computeIfAbsent(funcType, this::createBackOffFunc); @@ -229,12 +217,7 @@ private void logThrowError(Exception err) { } @Override - public SlowLogSpan slowLogStart(String span) { - return slowLog.start(span); - } - - @Override - public void logSlowLog() { - slowLog.logSlowLog(); + public SlowLog getSlowLog() { + return slowLog; } } diff --git a/src/main/java/org/tikv/raw/RawKVClient.java b/src/main/java/org/tikv/raw/RawKVClient.java index 0acbd372bcb..9b87b1faaef 100644 --- a/src/main/java/org/tikv/raw/RawKVClient.java +++ b/src/main/java/org/tikv/raw/RawKVClient.java @@ -31,6 +31,7 @@ import org.tikv.common.exception.TiKVException; import org.tikv.common.key.Key; import org.tikv.common.log.SlowLog; +import org.tikv.common.log.SlowLogEmptyImpl; import org.tikv.common.log.SlowLogImpl; import org.tikv.common.operation.iterator.RawScanIterator; import org.tikv.common.region.RegionStoreClient; @@ -133,8 +134,9 @@ private void put(ByteString key, ByteString value, long ttl, boolean atomic) { String label = "client_raw_put"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); SlowLog slowLog = - new SlowLogImpl( - new HashMap(8) { + newSlowLog( + conf.getRawKVWriteTimeoutInMS(), + new HashMap(2) { { put("func", "put"); put("key", KeyUtils.formatBytesUTF8(key)); @@ -160,7 +162,7 @@ private void put(ByteString key, ByteString value, long ttl, boolean atomic) { throw e; } finally { requestTimer.observeDuration(); - logSlowLog(backOffer); + slowLog.log(); } } @@ -189,8 +191,9 @@ public ByteString putIfAbsent(ByteString key, ByteString value, long ttl) { String label = "client_raw_put_if_absent"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); SlowLog slowLog = - new SlowLogImpl( - new HashMap(8) { + newSlowLog( + conf.getRawKVWriteTimeoutInMS(), + new HashMap(2) { { put("func", "putIfAbsent"); put("key", KeyUtils.formatBytesUTF8(key)); @@ -216,7 +219,7 @@ public ByteString putIfAbsent(ByteString key, ByteString value, long ttl) { throw e; } finally { requestTimer.observeDuration(); - logSlowLog(backOffer); + slowLog.log(); } } @@ -262,8 +265,9 @@ private void batchPut(Map kvPairs, long ttl, boolean ato String label = "client_raw_batch_put"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); SlowLog slowLog = - new SlowLogImpl( - new HashMap(8) { + newSlowLog( + conf.getRawKVBatchWriteTimeoutInMS(), + new HashMap(2) { { put("func", "batchPut"); put("keySize", String.valueOf(kvPairs.size())); @@ -280,7 +284,7 @@ private void batchPut(Map kvPairs, long ttl, boolean ato throw e; } finally { requestTimer.observeDuration(); - logSlowLog(backOffer); + slowLog.log(); } } @@ -294,8 +298,9 @@ public ByteString get(ByteString key) { String label = "client_raw_get"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); SlowLog slowLog = - new SlowLogImpl( - new HashMap(8) { + newSlowLog( + conf.getRawKVReadTimeoutInMS(), + new HashMap(2) { { put("func", "get"); put("key", KeyUtils.formatBytesUTF8(key)); @@ -322,7 +327,7 @@ public ByteString get(ByteString key) { throw e; } finally { requestTimer.observeDuration(); - logSlowLog(backOffer); + slowLog.log(); } } @@ -336,8 +341,9 @@ public List batchGet(List keys) { String label = "client_raw_batch_get"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); SlowLog slowLog = - new SlowLogImpl( - new HashMap(8) { + newSlowLog( + conf.getRawKVBatchReadTimeoutInMS(), + new HashMap(2) { { put("func", "batchGet"); put("keySize", String.valueOf(keys.size())); @@ -355,7 +361,7 @@ public List batchGet(List keys) { throw e; } finally { requestTimer.observeDuration(); - logSlowLog(backOffer); + slowLog.log(); } } @@ -381,8 +387,9 @@ private void batchDelete(List keys, boolean atomic) { String label = "client_raw_batch_delete"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); SlowLog slowLog = - new SlowLogImpl( - new HashMap(8) { + newSlowLog( + conf.getRawKVBatchWriteTimeoutInMS(), + new HashMap(2) { { put("func", "batchDelete"); put("keySize", String.valueOf(keys.size())); @@ -400,7 +407,7 @@ private void batchDelete(List keys, boolean atomic) { throw e; } finally { requestTimer.observeDuration(); - logSlowLog(backOffer); + slowLog.log(); } } @@ -415,8 +422,9 @@ public Long getKeyTTL(ByteString key) { String label = "client_raw_get_key_ttl"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); SlowLog slowLog = - new SlowLogImpl( - new HashMap(8) { + newSlowLog( + conf.getRawKVReadTimeoutInMS(), + new HashMap(2) { { put("func", "getKeyTTL"); put("key", KeyUtils.formatBytesUTF8(key)); @@ -442,7 +450,7 @@ public Long getKeyTTL(ByteString key) { throw e; } finally { requestTimer.observeDuration(); - logSlowLog(backOffer); + slowLog.log(); } } @@ -521,8 +529,9 @@ public List scan(ByteString startKey, ByteString endKey, int limit, bool String label = "client_raw_scan"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); SlowLog slowLog = - new SlowLogImpl( - new HashMap(8) { + newSlowLog( + conf.getRawKVScanTimeoutInMS(), + new HashMap(5) { { put("func", "scan"); put("startKey", KeyUtils.formatBytesUTF8(startKey)); @@ -545,7 +554,7 @@ public List scan(ByteString startKey, ByteString endKey, int limit, bool throw e; } finally { requestTimer.observeDuration(); - logSlowLog(backOffer); + slowLog.log(); } } @@ -595,8 +604,9 @@ public List scan(ByteString startKey, ByteString endKey, boolean keyOnly String label = "client_raw_scan_without_limit"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); SlowLog slowLog = - new SlowLogImpl( - new HashMap(8) { + newSlowLog( + conf.getRawKVScanTimeoutInMS(), + new HashMap(4) { { put("func", "scan"); put("startKey", KeyUtils.formatBytesUTF8(startKey)); @@ -632,7 +642,7 @@ public List scan(ByteString startKey, ByteString endKey, boolean keyOnly throw e; } finally { requestTimer.observeDuration(); - logSlowLog(backOffer); + slowLog.log(); } } @@ -686,8 +696,9 @@ private void delete(ByteString key, boolean atomic) { String label = "client_raw_delete"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); SlowLog slowLog = - new SlowLogImpl( - new HashMap(8) { + newSlowLog( + conf.getRawKVWriteTimeoutInMS(), + new HashMap(3) { { put("func", "delete"); put("key", KeyUtils.formatBytesUTF8(key)); @@ -714,7 +725,7 @@ private void delete(ByteString key, boolean atomic) { throw e; } finally { requestTimer.observeDuration(); - logSlowLog(backOffer); + slowLog.log(); } } @@ -730,17 +741,8 @@ private void delete(ByteString key, boolean atomic) { public synchronized void deleteRange(ByteString startKey, ByteString endKey) { String label = "client_raw_delete_range"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); - SlowLog slowLog = - new SlowLogImpl( - new HashMap(8) { - { - put("func", "deleteRange"); - put("startKey", KeyUtils.formatBytesUTF8(startKey)); - put("endKey", KeyUtils.formatBytesUTF8(endKey)); - } - }); ConcreteBackOffer backOffer = - ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVCleanTimeoutInMS(), slowLog); + ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVCleanTimeoutInMS(), SlowLogEmptyImpl.INSTANCE); try { long deadline = System.currentTimeMillis() + conf.getRawKVCleanTimeoutInMS(); doSendDeleteRange(backOffer, startKey, endKey, deadline); @@ -750,7 +752,6 @@ public synchronized void deleteRange(ByteString startKey, ByteString endKey) { throw e; } finally { requestTimer.observeDuration(); - logSlowLog(backOffer); } } @@ -1067,11 +1068,8 @@ private Iterator rawScanIterator( return new RawScanIterator(conf, builder, startKey, endKey, limit, keyOnly, backOffer); } - private void logSlowLog(ConcreteBackOffer backOffer) { - double slowLogThreshold = conf.getSlowLogThreshold(); - long currentMS = System.currentTimeMillis(); - if ((currentMS - backOffer.startMS) >= slowLogThreshold * backOffer.timeoutInMs) { - backOffer.logSlowLog(); - } + private SlowLog newSlowLog(long timeoutMS, Map properties) { + long slowLogTimeoutMS = (long) (conf.getSlowLogThreshold() * timeoutMS); + return new SlowLogImpl(slowLogTimeoutMS, properties); } } diff --git a/src/test/java/org/tikv/raw/RawKVClientTest.java b/src/test/java/org/tikv/raw/RawKVClientTest.java index 9f69f1b066a..747dffb1ba1 100644 --- a/src/test/java/org/tikv/raw/RawKVClientTest.java +++ b/src/test/java/org/tikv/raw/RawKVClientTest.java @@ -19,6 +19,7 @@ import org.tikv.common.codec.KeyUtils; import org.tikv.common.exception.TiKVException; import org.tikv.common.key.Key; +import org.tikv.common.log.SlowLogEmptyImpl; import org.tikv.common.util.BackOffFunction; import org.tikv.common.util.BackOffer; import org.tikv.common.util.ConcreteBackOffer; @@ -187,7 +188,7 @@ public void testCustomBackOff() { public void testDeadlineBackOff() { int timeout = 2000; int sleep = 150; - BackOffer backOffer = ConcreteBackOffer.newDeadlineBackOff(timeout); + BackOffer backOffer = ConcreteBackOffer.newDeadlineBackOff(timeout, SlowLogEmptyImpl.INSTANCE); long s = System.currentTimeMillis(); try { while (true) { @@ -303,15 +304,24 @@ public void deleteRangeTest() { */ @Test public void getTest() { - ByteString key = rawKey("key"); + ByteString key = rawKey("key49"); + client.get(key); client.get(key); } @Test public void batchGetTest() { // TODO: mars - ByteString key = rawKey("key"); - client.get(key); + List keys = new ArrayList<>(); + for(int i = 49; i < 50; i ++) { + ByteString key = rawKey("key" + i); + keys.add(key); + client.get(key); + client.put(key, key); + } + + //client.batchGet(keys); + //client.batchGet(keys); } @Test From 5303520250cd4b3e9092b5589b601bcf0ae507cb Mon Sep 17 00:00:00 2001 From: marsishandsome Date: Fri, 19 Nov 2021 13:09:29 +0800 Subject: [PATCH 3/5] add slow log time Signed-off-by: marsishandsome --- .../java/org/tikv/common/ConfigUtils.java | 10 +++- .../java/org/tikv/common/TiConfiguration.java | 54 ++++++++++++++++--- .../java/org/tikv/common/log/SlowLogImpl.java | 8 ++- src/main/java/org/tikv/raw/RawKVClient.java | 48 ++++++++--------- .../java/org/tikv/raw/RawKVClientTest.java | 25 --------- 5 files changed, 84 insertions(+), 61 deletions(-) diff --git a/src/main/java/org/tikv/common/ConfigUtils.java b/src/main/java/org/tikv/common/ConfigUtils.java index bb309ec4c6d..f400242ef43 100644 --- a/src/main/java/org/tikv/common/ConfigUtils.java +++ b/src/main/java/org/tikv/common/ConfigUtils.java @@ -67,7 +67,13 @@ public class ConfigUtils { public static final String TIKV_RAWKV_SCAN_TIMEOUT_IN_MS = "tikv.rawkv.scan_timeout_in_ms"; public static final String TIKV_RAWKV_CLEAN_TIMEOUT_IN_MS = "tikv.rawkv.clean_timeout_in_ms"; public static final String TIKV_BO_REGION_MISS_BASE_IN_MS = "tikv.bo_region_miss_base_in_ms"; - public static final String TIKV_SLOW_LOG_THRESHOLD = "tikv.slow_log_threshold"; + public static final String TIKV_RAWKV_READ_SLOWLOG_IN_MS = "tikv.rawkv.read_slowlog_in_ms"; + public static final String TIKV_RAWKV_WRITE_SLOWLOG_IN_MS = "tikv.rawkv.write_slowlog_in_ms"; + public static final String TIKV_RAWKV_BATCH_READ_SLOWLOG_IN_MS = + "tikv.rawkv.batch_read_slowlog_in_ms"; + public static final String TIKV_RAWKV_BATCH_WRITE_SLOWLOG_IN_MS = + "tikv.rawkv.batch_write_slowlog_in_ms"; + public static final String TIKV_RAWKV_SCAN_SLOWLOG_IN_MS = "tikv.rawkv.scan_slowlog_in_ms"; public static final String DEF_PD_ADDRESSES = "127.0.0.1:2379"; public static final String DEF_TIMEOUT = "200ms"; @@ -111,7 +117,7 @@ public class ConfigUtils { public static final int DEF_TIKV_RAWKV_CLEAN_TIMEOUT_IN_MS = 600000; public static final int DEF_TIKV_BO_REGION_MISS_BASE_IN_MS = 20; - public static final String DEF_TIKV_SLOW_LOG_THRESHOLD = "0.5"; // TODO: default value? + public static final String DEF_TIKV_RAWKV_SCAN_SLOWLOG_IN_MS = "5000"; public static final String NORMAL_COMMAND_PRIORITY = "NORMAL"; public static final String LOW_COMMAND_PRIORITY = "LOW"; diff --git a/src/main/java/org/tikv/common/TiConfiguration.java b/src/main/java/org/tikv/common/TiConfiguration.java index 30748e3b85c..54e96224017 100644 --- a/src/main/java/org/tikv/common/TiConfiguration.java +++ b/src/main/java/org/tikv/common/TiConfiguration.java @@ -119,7 +119,7 @@ private static void loadFromDefaultProperties() { setIfMissing(TIKV_RAWKV_SCAN_TIMEOUT_IN_MS, DEF_TIKV_RAWKV_SCAN_TIMEOUT_IN_MS); setIfMissing(TIKV_RAWKV_CLEAN_TIMEOUT_IN_MS, DEF_TIKV_RAWKV_CLEAN_TIMEOUT_IN_MS); setIfMissing(TIKV_BO_REGION_MISS_BASE_IN_MS, DEF_TIKV_BO_REGION_MISS_BASE_IN_MS); - setIfMissing(TIKV_SLOW_LOG_THRESHOLD, DEF_TIKV_SLOW_LOG_THRESHOLD); + setIfMissing(TIKV_RAWKV_SCAN_SLOWLOG_IN_MS, DEF_TIKV_RAWKV_SCAN_SLOWLOG_IN_MS); } public static void listAll() { @@ -170,6 +170,10 @@ public static int getInt(String key) { return Integer.parseInt(get(key)); } + public static Optional getIntOption(String key) { + return getOption(key).map(Integer::parseInt); + } + private static int getInt(String key, int defaultValue) { try { return getOption(key).map(Integer::parseInt).orElse(defaultValue); @@ -316,7 +320,13 @@ private static ReplicaRead getReplicaRead(String key) { private int rawKVBatchWriteTimeoutInMS = getInt(TIKV_RAWKV_BATCH_WRITE_TIMEOUT_IN_MS); private int rawKVScanTimeoutInMS = getInt(TIKV_RAWKV_SCAN_TIMEOUT_IN_MS); private int rawKVCleanTimeoutInMS = getInt(TIKV_RAWKV_CLEAN_TIMEOUT_IN_MS); - private double slowLogThreshold = getDouble(TIKV_SLOW_LOG_THRESHOLD); + private Optional rawKVReadSlowLogInMS = getIntOption(TIKV_RAWKV_READ_SLOWLOG_IN_MS); + private Optional rawKVWriteSlowLogInMS = getIntOption(TIKV_RAWKV_WRITE_SLOWLOG_IN_MS); + private Optional rawKVBatchReadSlowLogInMS = + getIntOption(TIKV_RAWKV_BATCH_READ_SLOWLOG_IN_MS); + private Optional rawKVBatchWriteSlowLogInMS = + getIntOption(TIKV_RAWKV_BATCH_WRITE_SLOWLOG_IN_MS); + private int rawKVScanSlowLogInMS = getInt(TIKV_RAWKV_SCAN_SLOWLOG_IN_MS); public enum KVMode { TXN, @@ -680,11 +690,43 @@ public void setRawKVCleanTimeoutInMS(int rawKVCleanTimeoutInMS) { this.rawKVCleanTimeoutInMS = rawKVCleanTimeoutInMS; } - public double getSlowLogThreshold() { - return slowLogThreshold; + public Integer getRawKVReadSlowLogInMS() { + return rawKVReadSlowLogInMS.orElse((int) (getTimeout() * 2)); + } + + public void setRawKVReadSlowLogInMS(Integer rawKVReadSlowLogInMS) { + this.rawKVReadSlowLogInMS = Optional.of(rawKVReadSlowLogInMS); + } + + public Integer getRawKVWriteSlowLogInMS() { + return rawKVWriteSlowLogInMS.orElse((int) (getTimeout() * 2)); + } + + public void setRawKVWriteSlowLogInMS(Integer rawKVWriteSlowLogInMS) { + this.rawKVWriteSlowLogInMS = Optional.of(rawKVWriteSlowLogInMS); + } + + public Integer getRawKVBatchReadSlowLogInMS() { + return rawKVBatchReadSlowLogInMS.orElse((int) (getTimeout() * 2)); + } + + public void setRawKVBatchReadSlowLogInMS(Integer rawKVBatchReadSlowLogInMS) { + this.rawKVBatchReadSlowLogInMS = Optional.of(rawKVBatchReadSlowLogInMS); + } + + public Integer getRawKVBatchWriteSlowLogInMS() { + return rawKVBatchWriteSlowLogInMS.orElse((int) (getTimeout() * 2)); + } + + public void setRawKVBatchWriteSlowLogInMS(Integer rawKVBatchWriteSlowLogInMS) { + this.rawKVBatchWriteSlowLogInMS = Optional.of(rawKVBatchWriteSlowLogInMS); + } + + public int getRawKVScanSlowLogInMS() { + return rawKVScanSlowLogInMS; } - public void setSlowLogThreshold(double slowLogThreshold) { - this.slowLogThreshold = slowLogThreshold; + public void setRawKVScanSlowLogInMS(int rawKVScanSlowLogInMS) { + this.rawKVScanSlowLogInMS = rawKVScanSlowLogInMS; } } diff --git a/src/main/java/org/tikv/common/log/SlowLogImpl.java b/src/main/java/org/tikv/common/log/SlowLogImpl.java index 7286f664400..94ac60b19eb 100644 --- a/src/main/java/org/tikv/common/log/SlowLogImpl.java +++ b/src/main/java/org/tikv/common/log/SlowLogImpl.java @@ -30,6 +30,8 @@ public class SlowLogImpl implements SlowLog { private static final Logger logger = LoggerFactory.getLogger(SlowLogImpl.class); + private static final int MAX_SPAN_SIZE = 1024; + public static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("HH:mm:ss.SSS"); private final List slowLogSpans = new ArrayList<>(); @@ -54,7 +56,9 @@ public void addProperty(String key, String value) { @Override public synchronized SlowLogSpan start(String name) { SlowLogSpan slowLogSpan = new SlowLogSpanImpl(name); - slowLogSpans.add(slowLogSpan); + if (slowLogSpans.size() < MAX_SPAN_SIZE) { + slowLogSpans.add(slowLogSpan); + } slowLogSpan.start(); return slowLogSpan; } @@ -62,7 +66,7 @@ public synchronized SlowLogSpan start(String name) { @Override public void log() { long currentMS = System.currentTimeMillis(); - if (currentMS - startMS > timeoutMS) { + if (timeoutMS >= 0 && currentMS - startMS > timeoutMS) { logger.warn("SlowLog:" + getSlowLogString(currentMS)); } } diff --git a/src/main/java/org/tikv/raw/RawKVClient.java b/src/main/java/org/tikv/raw/RawKVClient.java index 9b87b1faaef..e488ab71886 100644 --- a/src/main/java/org/tikv/raw/RawKVClient.java +++ b/src/main/java/org/tikv/raw/RawKVClient.java @@ -134,8 +134,8 @@ private void put(ByteString key, ByteString value, long ttl, boolean atomic) { String label = "client_raw_put"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); SlowLog slowLog = - newSlowLog( - conf.getRawKVWriteTimeoutInMS(), + new SlowLogImpl( + conf.getRawKVWriteSlowLogInMS(), new HashMap(2) { { put("func", "put"); @@ -191,8 +191,8 @@ public ByteString putIfAbsent(ByteString key, ByteString value, long ttl) { String label = "client_raw_put_if_absent"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); SlowLog slowLog = - newSlowLog( - conf.getRawKVWriteTimeoutInMS(), + new SlowLogImpl( + conf.getRawKVWriteSlowLogInMS(), new HashMap(2) { { put("func", "putIfAbsent"); @@ -265,8 +265,8 @@ private void batchPut(Map kvPairs, long ttl, boolean ato String label = "client_raw_batch_put"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); SlowLog slowLog = - newSlowLog( - conf.getRawKVBatchWriteTimeoutInMS(), + new SlowLogImpl( + conf.getRawKVBatchWriteSlowLogInMS(), new HashMap(2) { { put("func", "batchPut"); @@ -298,8 +298,8 @@ public ByteString get(ByteString key) { String label = "client_raw_get"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); SlowLog slowLog = - newSlowLog( - conf.getRawKVReadTimeoutInMS(), + new SlowLogImpl( + conf.getRawKVReadSlowLogInMS(), new HashMap(2) { { put("func", "get"); @@ -341,8 +341,8 @@ public List batchGet(List keys) { String label = "client_raw_batch_get"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); SlowLog slowLog = - newSlowLog( - conf.getRawKVBatchReadTimeoutInMS(), + new SlowLogImpl( + conf.getRawKVBatchReadSlowLogInMS(), new HashMap(2) { { put("func", "batchGet"); @@ -387,8 +387,8 @@ private void batchDelete(List keys, boolean atomic) { String label = "client_raw_batch_delete"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); SlowLog slowLog = - newSlowLog( - conf.getRawKVBatchWriteTimeoutInMS(), + new SlowLogImpl( + conf.getRawKVBatchWriteSlowLogInMS(), new HashMap(2) { { put("func", "batchDelete"); @@ -422,8 +422,8 @@ public Long getKeyTTL(ByteString key) { String label = "client_raw_get_key_ttl"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); SlowLog slowLog = - newSlowLog( - conf.getRawKVReadTimeoutInMS(), + new SlowLogImpl( + conf.getRawKVReadSlowLogInMS(), new HashMap(2) { { put("func", "getKeyTTL"); @@ -529,8 +529,8 @@ public List scan(ByteString startKey, ByteString endKey, int limit, bool String label = "client_raw_scan"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); SlowLog slowLog = - newSlowLog( - conf.getRawKVScanTimeoutInMS(), + new SlowLogImpl( + conf.getRawKVScanSlowLogInMS(), new HashMap(5) { { put("func", "scan"); @@ -604,8 +604,8 @@ public List scan(ByteString startKey, ByteString endKey, boolean keyOnly String label = "client_raw_scan_without_limit"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); SlowLog slowLog = - newSlowLog( - conf.getRawKVScanTimeoutInMS(), + new SlowLogImpl( + conf.getRawKVScanSlowLogInMS(), new HashMap(4) { { put("func", "scan"); @@ -696,8 +696,8 @@ private void delete(ByteString key, boolean atomic) { String label = "client_raw_delete"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); SlowLog slowLog = - newSlowLog( - conf.getRawKVWriteTimeoutInMS(), + new SlowLogImpl( + conf.getRawKVWriteSlowLogInMS(), new HashMap(3) { { put("func", "delete"); @@ -742,7 +742,8 @@ public synchronized void deleteRange(ByteString startKey, ByteString endKey) { String label = "client_raw_delete_range"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); ConcreteBackOffer backOffer = - ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVCleanTimeoutInMS(), SlowLogEmptyImpl.INSTANCE); + ConcreteBackOffer.newDeadlineBackOff( + conf.getRawKVCleanTimeoutInMS(), SlowLogEmptyImpl.INSTANCE); try { long deadline = System.currentTimeMillis() + conf.getRawKVCleanTimeoutInMS(); doSendDeleteRange(backOffer, startKey, endKey, deadline); @@ -1067,9 +1068,4 @@ private Iterator rawScanIterator( } return new RawScanIterator(conf, builder, startKey, endKey, limit, keyOnly, backOffer); } - - private SlowLog newSlowLog(long timeoutMS, Map properties) { - long slowLogTimeoutMS = (long) (conf.getSlowLogThreshold() * timeoutMS); - return new SlowLogImpl(slowLogTimeoutMS, properties); - } } diff --git a/src/test/java/org/tikv/raw/RawKVClientTest.java b/src/test/java/org/tikv/raw/RawKVClientTest.java index 747dffb1ba1..5fe15dbd228 100644 --- a/src/test/java/org/tikv/raw/RawKVClientTest.java +++ b/src/test/java/org/tikv/raw/RawKVClientTest.java @@ -299,31 +299,6 @@ public void deleteRangeTest() { client.deleteRange(ByteString.EMPTY, ByteString.EMPTY); } - /* - iptables -I INPUT -s 192.168.190.70 -p tcp --dport 22366 -j DROP - */ - @Test - public void getTest() { - ByteString key = rawKey("key49"); - client.get(key); - client.get(key); - } - - @Test - public void batchGetTest() { - // TODO: mars - List keys = new ArrayList<>(); - for(int i = 49; i < 50; i ++) { - ByteString key = rawKey("key" + i); - keys.add(key); - client.get(key); - client.put(key, key); - } - - //client.batchGet(keys); - //client.batchGet(keys); - } - @Test public void simpleTest() { if (!initialized) return; From dcbc172c6f9ca55de165f2c8f0053fe5e49bb64c Mon Sep 17 00:00:00 2001 From: marsishandsome Date: Fri, 19 Nov 2021 16:05:52 +0800 Subject: [PATCH 4/5] rename timeout to threshold Signed-off-by: marsishandsome --- src/main/java/org/tikv/common/log/SlowLogImpl.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/main/java/org/tikv/common/log/SlowLogImpl.java b/src/main/java/org/tikv/common/log/SlowLogImpl.java index 94ac60b19eb..e2bb8426261 100644 --- a/src/main/java/org/tikv/common/log/SlowLogImpl.java +++ b/src/main/java/org/tikv/common/log/SlowLogImpl.java @@ -37,14 +37,14 @@ public class SlowLogImpl implements SlowLog { private final List slowLogSpans = new ArrayList<>(); private final long startMS; - private final long timeoutMS; + private final long thresholdMS; /** Key-Value pairs which will be logged, e.g. function name, key, region, etc. */ private final Map properties; - public SlowLogImpl(long timeoutMS, Map properties) { + public SlowLogImpl(long thresholdMS, Map properties) { this.startMS = System.currentTimeMillis(); - this.timeoutMS = timeoutMS; + this.thresholdMS = thresholdMS; this.properties = new HashMap<>(properties); } @@ -66,7 +66,7 @@ public synchronized SlowLogSpan start(String name) { @Override public void log() { long currentMS = System.currentTimeMillis(); - if (timeoutMS >= 0 && currentMS - startMS > timeoutMS) { + if (thresholdMS >= 0 && currentMS - startMS > thresholdMS) { logger.warn("SlowLog:" + getSlowLogString(currentMS)); } } @@ -77,7 +77,7 @@ private String getSlowLogString(long currentMS) { jsonObject.addProperty("start", DATE_FORMAT.format(startMS)); jsonObject.addProperty("end", DATE_FORMAT.format(currentMS)); jsonObject.addProperty("duration", (currentMS - startMS) + "ms"); - jsonObject.addProperty("timeout", timeoutMS + "ms"); + jsonObject.addProperty("threshold", thresholdMS + "ms"); for (Map.Entry entry : properties.entrySet()) { jsonObject.addProperty(entry.getKey(), entry.getValue()); From 56849040dba8b2b12c5e24fe8e57189bc95361f7 Mon Sep 17 00:00:00 2001 From: marsishandsome Date: Fri, 19 Nov 2021 16:38:35 +0800 Subject: [PATCH 5/5] address code review Signed-off-by: marsishandsome --- src/main/java/org/tikv/common/log/SlowLogImpl.java | 9 ++++----- .../tikv/common/region/AbstractRegionStoreClient.java | 2 ++ 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/src/main/java/org/tikv/common/log/SlowLogImpl.java b/src/main/java/org/tikv/common/log/SlowLogImpl.java index e2bb8426261..76ed4c922bf 100644 --- a/src/main/java/org/tikv/common/log/SlowLogImpl.java +++ b/src/main/java/org/tikv/common/log/SlowLogImpl.java @@ -37,14 +37,14 @@ public class SlowLogImpl implements SlowLog { private final List slowLogSpans = new ArrayList<>(); private final long startMS; - private final long thresholdMS; + private final long slowThresholdMS; /** Key-Value pairs which will be logged, e.g. function name, key, region, etc. */ private final Map properties; - public SlowLogImpl(long thresholdMS, Map properties) { + public SlowLogImpl(long slowThresholdMS, Map properties) { this.startMS = System.currentTimeMillis(); - this.thresholdMS = thresholdMS; + this.slowThresholdMS = slowThresholdMS; this.properties = new HashMap<>(properties); } @@ -66,7 +66,7 @@ public synchronized SlowLogSpan start(String name) { @Override public void log() { long currentMS = System.currentTimeMillis(); - if (thresholdMS >= 0 && currentMS - startMS > thresholdMS) { + if (slowThresholdMS >= 0 && currentMS - startMS > slowThresholdMS) { logger.warn("SlowLog:" + getSlowLogString(currentMS)); } } @@ -77,7 +77,6 @@ private String getSlowLogString(long currentMS) { jsonObject.addProperty("start", DATE_FORMAT.format(startMS)); jsonObject.addProperty("end", DATE_FORMAT.format(currentMS)); jsonObject.addProperty("duration", (currentMS - startMS) + "ms"); - jsonObject.addProperty("threshold", thresholdMS + "ms"); for (Map.Entry entry : properties.entrySet()) { jsonObject.addProperty(entry.getKey(), entry.getValue()); diff --git a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java index 6a32850e598..8b102f04fc5 100644 --- a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java @@ -83,6 +83,8 @@ protected AbstractRegionStoreClient( if (this.store.getProxyStore() != null) { this.timeout = conf.getForwardTimeout(); } else if (!this.store.isReachable()) { + // cannot get Deadline or SlowLog instance here + // use SlowLogEmptyImpl instead to skip slow log record onStoreUnreachable(SlowLogEmptyImpl.INSTANCE); } }