Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions src/main/java/org/tikv/common/AbstractGRPCClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ public <ReqT, RespT> RespT callWithRetry(
return ClientCalls.blockingUnaryCall(
stub.getChannel(), method, stub.getCallOptions(), requestFactory.get());
},
method.getFullMethodName());
method.getFullMethodName(),
backOffer);

if (logger.isTraceEnabled()) {
logger.trace(String.format("leaving %s...", method.getFullMethodName()));
Expand Down Expand Up @@ -118,7 +119,8 @@ protected <ReqT, RespT> void callAsyncWithRetry(
responseObserver);
return null;
},
method.getFullMethodName());
method.getFullMethodName(),
backOffer);
logger.debug(String.format("leaving %s...", method.getFullMethodName()));
}

Expand All @@ -139,7 +141,8 @@ <ReqT, RespT> StreamObserver<ReqT> callBidiStreamingWithRetry(
return asyncBidiStreamingCall(
stub.getChannel().newCall(method, stub.getCallOptions()), responseObserver);
},
method.getFullMethodName());
method.getFullMethodName(),
backOffer);
logger.debug(String.format("leaving %s...", method.getFullMethodName()));
return observer;
}
Expand All @@ -162,7 +165,8 @@ public <ReqT, RespT> StreamingResponse callServerStreamingWithRetry(
blockingServerStreamingCall(
stub.getChannel(), method, stub.getCallOptions(), requestFactory.get()));
},
method.getFullMethodName());
method.getFullMethodName(),
backOffer);
logger.debug(String.format("leaving %s...", method.getFullMethodName()));
return response;
}
Expand Down
8 changes: 8 additions & 0 deletions src/main/java/org/tikv/common/ConfigUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,13 @@ public class ConfigUtils {
public static final String TIKV_RAWKV_SCAN_TIMEOUT_IN_MS = "tikv.rawkv.scan_timeout_in_ms";
public static final String TIKV_RAWKV_CLEAN_TIMEOUT_IN_MS = "tikv.rawkv.clean_timeout_in_ms";
public static final String TIKV_BO_REGION_MISS_BASE_IN_MS = "tikv.bo_region_miss_base_in_ms";
public static final String TIKV_RAWKV_READ_SLOWLOG_IN_MS = "tikv.rawkv.read_slowlog_in_ms";
public static final String TIKV_RAWKV_WRITE_SLOWLOG_IN_MS = "tikv.rawkv.write_slowlog_in_ms";
public static final String TIKV_RAWKV_BATCH_READ_SLOWLOG_IN_MS =
"tikv.rawkv.batch_read_slowlog_in_ms";
public static final String TIKV_RAWKV_BATCH_WRITE_SLOWLOG_IN_MS =
"tikv.rawkv.batch_write_slowlog_in_ms";
public static final String TIKV_RAWKV_SCAN_SLOWLOG_IN_MS = "tikv.rawkv.scan_slowlog_in_ms";

public static final String DEF_PD_ADDRESSES = "127.0.0.1:2379";
public static final String DEF_TIMEOUT = "200ms";
Expand Down Expand Up @@ -110,6 +117,7 @@ public class ConfigUtils {
public static final int DEF_TIKV_RAWKV_CLEAN_TIMEOUT_IN_MS = 600000;

public static final int DEF_TIKV_BO_REGION_MISS_BASE_IN_MS = 20;
public static final String DEF_TIKV_RAWKV_SCAN_SLOWLOG_IN_MS = "5000";

public static final String NORMAL_COMMAND_PRIORITY = "NORMAL";
public static final String LOW_COMMAND_PRIORITY = "LOW";
Expand Down
52 changes: 52 additions & 0 deletions src/main/java/org/tikv/common/TiConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ private static void loadFromDefaultProperties() {
setIfMissing(TIKV_RAWKV_SCAN_TIMEOUT_IN_MS, DEF_TIKV_RAWKV_SCAN_TIMEOUT_IN_MS);
setIfMissing(TIKV_RAWKV_CLEAN_TIMEOUT_IN_MS, DEF_TIKV_RAWKV_CLEAN_TIMEOUT_IN_MS);
setIfMissing(TIKV_BO_REGION_MISS_BASE_IN_MS, DEF_TIKV_BO_REGION_MISS_BASE_IN_MS);
setIfMissing(TIKV_RAWKV_SCAN_SLOWLOG_IN_MS, DEF_TIKV_RAWKV_SCAN_SLOWLOG_IN_MS);
}

public static void listAll() {
Expand Down Expand Up @@ -169,6 +170,10 @@ public static int getInt(String key) {
return Integer.parseInt(get(key));
}

public static Optional<Integer> getIntOption(String key) {
return getOption(key).map(Integer::parseInt);
}

private static int getInt(String key, int defaultValue) {
try {
return getOption(key).map(Integer::parseInt).orElse(defaultValue);
Expand Down Expand Up @@ -315,6 +320,13 @@ private static ReplicaRead getReplicaRead(String key) {
private int rawKVBatchWriteTimeoutInMS = getInt(TIKV_RAWKV_BATCH_WRITE_TIMEOUT_IN_MS);
private int rawKVScanTimeoutInMS = getInt(TIKV_RAWKV_SCAN_TIMEOUT_IN_MS);
private int rawKVCleanTimeoutInMS = getInt(TIKV_RAWKV_CLEAN_TIMEOUT_IN_MS);
private Optional<Integer> rawKVReadSlowLogInMS = getIntOption(TIKV_RAWKV_READ_SLOWLOG_IN_MS);
private Optional<Integer> rawKVWriteSlowLogInMS = getIntOption(TIKV_RAWKV_WRITE_SLOWLOG_IN_MS);
private Optional<Integer> rawKVBatchReadSlowLogInMS =
getIntOption(TIKV_RAWKV_BATCH_READ_SLOWLOG_IN_MS);
private Optional<Integer> rawKVBatchWriteSlowLogInMS =
getIntOption(TIKV_RAWKV_BATCH_WRITE_SLOWLOG_IN_MS);
private int rawKVScanSlowLogInMS = getInt(TIKV_RAWKV_SCAN_SLOWLOG_IN_MS);

public enum KVMode {
TXN,
Expand Down Expand Up @@ -677,4 +689,44 @@ public int getRawKVCleanTimeoutInMS() {
public void setRawKVCleanTimeoutInMS(int rawKVCleanTimeoutInMS) {
this.rawKVCleanTimeoutInMS = rawKVCleanTimeoutInMS;
}

public Integer getRawKVReadSlowLogInMS() {
return rawKVReadSlowLogInMS.orElse((int) (getTimeout() * 2));
}

public void setRawKVReadSlowLogInMS(Integer rawKVReadSlowLogInMS) {
this.rawKVReadSlowLogInMS = Optional.of(rawKVReadSlowLogInMS);
}

public Integer getRawKVWriteSlowLogInMS() {
return rawKVWriteSlowLogInMS.orElse((int) (getTimeout() * 2));
}

public void setRawKVWriteSlowLogInMS(Integer rawKVWriteSlowLogInMS) {
this.rawKVWriteSlowLogInMS = Optional.of(rawKVWriteSlowLogInMS);
}

public Integer getRawKVBatchReadSlowLogInMS() {
return rawKVBatchReadSlowLogInMS.orElse((int) (getTimeout() * 2));
}

public void setRawKVBatchReadSlowLogInMS(Integer rawKVBatchReadSlowLogInMS) {
this.rawKVBatchReadSlowLogInMS = Optional.of(rawKVBatchReadSlowLogInMS);
}

public Integer getRawKVBatchWriteSlowLogInMS() {
return rawKVBatchWriteSlowLogInMS.orElse((int) (getTimeout() * 2));
}

public void setRawKVBatchWriteSlowLogInMS(Integer rawKVBatchWriteSlowLogInMS) {
this.rawKVBatchWriteSlowLogInMS = Optional.of(rawKVBatchWriteSlowLogInMS);
}

public int getRawKVScanSlowLogInMS() {
return rawKVScanSlowLogInMS;
}

public void setRawKVScanSlowLogInMS(int rawKVScanSlowLogInMS) {
this.rawKVScanSlowLogInMS = rawKVScanSlowLogInMS;
}
}
26 changes: 26 additions & 0 deletions src/main/java/org/tikv/common/log/SlowLog.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
*
* Copyright 2021 PingCAP, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.tikv.common.log;

public interface SlowLog {
void addProperty(String key, String value);

SlowLogSpan start(String name);

void log();
}
35 changes: 35 additions & 0 deletions src/main/java/org/tikv/common/log/SlowLogEmptyImpl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
*
* Copyright 2021 PingCAP, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.tikv.common.log;

public class SlowLogEmptyImpl implements SlowLog {
public static final SlowLogEmptyImpl INSTANCE = new SlowLogEmptyImpl();

private SlowLogEmptyImpl() {}

@Override
public void addProperty(String key, String value) {}

@Override
public SlowLogSpan start(String name) {
return SlowLogSpanEmptyImpl.INSTANCE;
}

@Override
public void log() {}
}
93 changes: 93 additions & 0 deletions src/main/java/org/tikv/common/log/SlowLogImpl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
*
* Copyright 2021 PingCAP, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.tikv.common.log;

import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SlowLogImpl implements SlowLog {
private static final Logger logger = LoggerFactory.getLogger(SlowLogImpl.class);

private static final int MAX_SPAN_SIZE = 1024;

public static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("HH:mm:ss.SSS");

private final List<SlowLogSpan> slowLogSpans = new ArrayList<>();

private final long startMS;
private final long slowThresholdMS;

/** Key-Value pairs which will be logged, e.g. function name, key, region, etc. */
private final Map<String, String> properties;

public SlowLogImpl(long slowThresholdMS, Map<String, String> properties) {
this.startMS = System.currentTimeMillis();
this.slowThresholdMS = slowThresholdMS;
this.properties = new HashMap<>(properties);
}

@Override
public void addProperty(String key, String value) {
this.properties.put(key, value);
}

@Override
public synchronized SlowLogSpan start(String name) {
SlowLogSpan slowLogSpan = new SlowLogSpanImpl(name);
if (slowLogSpans.size() < MAX_SPAN_SIZE) {
slowLogSpans.add(slowLogSpan);
}
slowLogSpan.start();
return slowLogSpan;
}

@Override
public void log() {
long currentMS = System.currentTimeMillis();
if (slowThresholdMS >= 0 && currentMS - startMS > slowThresholdMS) {
logger.warn("SlowLog:" + getSlowLogString(currentMS));
}
}

private String getSlowLogString(long currentMS) {
JsonObject jsonObject = new JsonObject();

jsonObject.addProperty("start", DATE_FORMAT.format(startMS));
jsonObject.addProperty("end", DATE_FORMAT.format(currentMS));
jsonObject.addProperty("duration", (currentMS - startMS) + "ms");

for (Map.Entry<String, String> entry : properties.entrySet()) {
jsonObject.addProperty(entry.getKey(), entry.getValue());
}

JsonArray jsonArray = new JsonArray();
for (SlowLogSpan slowLogSpan : slowLogSpans) {
jsonArray.add(slowLogSpan.toJsonElement());
}
jsonObject.add("spans", jsonArray);

return jsonObject.toString();
}
}
28 changes: 28 additions & 0 deletions src/main/java/org/tikv/common/log/SlowLogSpan.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
*
* Copyright 2021 PingCAP, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.tikv.common.log;

import com.google.gson.JsonElement;

public interface SlowLogSpan {
void start();

void end();

JsonElement toJsonElement();
}
39 changes: 39 additions & 0 deletions src/main/java/org/tikv/common/log/SlowLogSpanEmptyImpl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
*
* Copyright 2021 PingCAP, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.tikv.common.log;

import com.google.gson.JsonElement;
import com.google.gson.JsonObject;

public class SlowLogSpanEmptyImpl implements SlowLogSpan {

public static final SlowLogSpanEmptyImpl INSTANCE = new SlowLogSpanEmptyImpl();

private SlowLogSpanEmptyImpl() {}

@Override
public void start() {}

@Override
public void end() {}

@Override
public JsonElement toJsonElement() {
return new JsonObject();
}
}
Loading