Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/main/java/org/tikv/common/PDClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -744,4 +744,12 @@ private Metapb.Region decodeRegion(Metapb.Region region) {

return builder.build();
}

public long getClusterId() {
return header.getClusterId();
}

public List<URI> getPdAddrs() {
return pdAddrs;
}
}
30 changes: 26 additions & 4 deletions src/main/java/org/tikv/common/log/SlowLogImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,18 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SlowLogImpl implements SlowLog {

private static final Logger logger = LoggerFactory.getLogger(SlowLogImpl.class);

private static final int MAX_SPAN_SIZE = 1024;

private final List<SlowLogSpan> slowLogSpans = new ArrayList<>();
private final HashMap<String, Object> fields = new HashMap<>();
private Throwable error = null;

private final long startMS;
Expand All @@ -44,15 +47,19 @@ public class SlowLogImpl implements SlowLog {
private final long slowThresholdMS;

/** Key-Value pairs which will be logged, e.g. function name, key, region, etc. */
private final Map<String, String> properties;
private final Map<String, Object> properties;

public SlowLogImpl(long slowThresholdMS, Map<String, String> properties) {
public SlowLogImpl(long slowThresholdMS, Map<String, Object> properties) {
this.startMS = System.currentTimeMillis();
this.startNS = System.nanoTime();
this.slowThresholdMS = slowThresholdMS;
this.properties = new HashMap<>(properties);
}

public SlowLogImpl(long slowThresholdMS) {
this(slowThresholdMS, new HashMap<>());
}

@Override
public void addProperty(String key, String value) {
this.properties.put(key, value);
Expand Down Expand Up @@ -93,8 +100,23 @@ private String getSlowLogString(long currentMS) {
jsonObject.addProperty("error", error.getMessage());
}

for (Map.Entry<String, String> entry : properties.entrySet()) {
jsonObject.addProperty(entry.getKey(), entry.getValue());
for (Entry<String, Object> entry : properties.entrySet()) {
Object value = entry.getValue();
if (value instanceof List) {
JsonArray field = new JsonArray();
for (Object o : (List<?>) value) {
field.add(o.toString());
}
jsonObject.add(entry.getKey(), field);
} else if (value instanceof Map) {
JsonObject field = new JsonObject();
for (Entry<?, ?> e : ((Map<?, ?>) value).entrySet()) {
field.addProperty(e.getKey().toString(), e.getValue().toString());
}
jsonObject.add(entry.getKey(), field);
} else {
jsonObject.addProperty(entry.getKey(), value.toString());
}
}

JsonArray jsonArray = new JsonArray();
Expand Down
75 changes: 61 additions & 14 deletions src/main/java/org/tikv/raw/RawKVClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,29 @@

package org.tikv.raw;

import static org.tikv.common.util.ClientUtils.*;
import static org.tikv.common.util.ClientUtils.appendBatches;
import static org.tikv.common.util.ClientUtils.getBatches;
import static org.tikv.common.util.ClientUtils.getTasks;
import static org.tikv.common.util.ClientUtils.getTasksWithOutput;
import static org.tikv.common.util.ClientUtils.groupKeysByRegion;

import com.google.protobuf.ByteString;
import io.prometheus.client.Counter;
import io.prometheus.client.Histogram;
import java.util.*;
import java.util.concurrent.*;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -37,10 +53,19 @@
import org.tikv.common.region.RegionStoreClient;
import org.tikv.common.region.RegionStoreClient.RegionStoreClientBuilder;
import org.tikv.common.region.TiRegion;
import org.tikv.common.util.*;
import org.tikv.common.util.BackOffFunction;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.Batch;
import org.tikv.common.util.ConcreteBackOffer;
import org.tikv.common.util.DeleteRange;
import org.tikv.common.util.HistogramUtils;
import org.tikv.common.util.Pair;
import org.tikv.common.util.ScanOption;
import org.tikv.kvproto.Kvrpcpb.KvPair;

public class RawKVClient implements RawKVClientBase {
private final Long clusterId;
private final List<URI> pdAddresses;
private final RegionStoreClientBuilder clientBuilder;
private final TiConfiguration conf;
private final ExecutorService batchGetThreadPool;
Expand Down Expand Up @@ -84,6 +109,8 @@ public RawKVClient(TiSession session, RegionStoreClientBuilder clientBuilder) {
this.batchDeleteThreadPool = session.getThreadPoolForBatchDelete();
this.batchScanThreadPool = session.getThreadPoolForBatchScan();
this.deleteRangeThreadPool = session.getThreadPoolForDeleteRange();
this.clusterId = session.getPDClient().getClusterId();
this.pdAddresses = session.getPDClient().getPdAddrs();
}

@Override
Expand All @@ -110,8 +137,10 @@ private void put(ByteString key, ByteString value, long ttl, boolean atomic) {
SlowLog slowLog =
new SlowLogImpl(
conf.getRawKVWriteSlowLogInMS(),
new HashMap<String, String>(2) {
new HashMap<String, Object>() {
{
put("cluster_id", clusterId);
put("pd_addresses", pdAddresses);
put("func", "put");
put("key", KeyUtils.formatBytesUTF8(key));
}
Expand Down Expand Up @@ -152,8 +181,10 @@ public ByteString putIfAbsent(ByteString key, ByteString value, long ttl) {
SlowLog slowLog =
new SlowLogImpl(
conf.getRawKVWriteSlowLogInMS(),
new HashMap<String, String>(2) {
new HashMap<String, Object>() {
{
put("cluster_id", clusterId);
put("pd_addresses", pdAddresses);
put("func", "putIfAbsent");
put("key", KeyUtils.formatBytesUTF8(key));
}
Expand Down Expand Up @@ -208,8 +239,10 @@ private void batchPut(Map<ByteString, ByteString> kvPairs, long ttl, boolean ato
SlowLog slowLog =
new SlowLogImpl(
conf.getRawKVBatchWriteSlowLogInMS(),
new HashMap<String, String>(2) {
new HashMap<String, Object>() {
{
put("cluster_id", clusterId);
put("pd_addresses", pdAddresses);
put("func", "batchPut");
put("keySize", String.valueOf(kvPairs.size()));
}
Expand Down Expand Up @@ -237,8 +270,10 @@ public ByteString get(ByteString key) {
SlowLog slowLog =
new SlowLogImpl(
conf.getRawKVReadSlowLogInMS(),
new HashMap<String, String>(2) {
new HashMap<String, Object>(2) {
{
put("cluster_id", clusterId);
put("pd_addresses", pdAddresses);
put("func", "get");
put("key", KeyUtils.formatBytesUTF8(key));
}
Expand Down Expand Up @@ -275,8 +310,10 @@ public List<KvPair> batchGet(List<ByteString> keys) {
SlowLog slowLog =
new SlowLogImpl(
conf.getRawKVBatchReadSlowLogInMS(),
new HashMap<String, String>(2) {
new HashMap<String, Object>(2) {
{
put("cluster_id", clusterId);
put("pd_addresses", pdAddresses);
put("func", "batchGet");
put("keySize", String.valueOf(keys.size()));
}
Expand Down Expand Up @@ -314,8 +351,10 @@ private void batchDelete(List<ByteString> keys, boolean atomic) {
SlowLog slowLog =
new SlowLogImpl(
conf.getRawKVBatchWriteSlowLogInMS(),
new HashMap<String, String>(2) {
new HashMap<String, Object>() {
{
put("cluster_id", clusterId);
put("pd_addresses", pdAddresses);
put("func", "batchDelete");
put("keySize", String.valueOf(keys.size()));
}
Expand Down Expand Up @@ -344,8 +383,10 @@ public Long getKeyTTL(ByteString key) {
SlowLog slowLog =
new SlowLogImpl(
conf.getRawKVReadSlowLogInMS(),
new HashMap<String, String>(2) {
new HashMap<String, Object>() {
{
put("cluster_id", clusterId);
put("pd_addresses", pdAddresses);
put("func", "getKeyTTL");
put("key", KeyUtils.formatBytesUTF8(key));
}
Expand Down Expand Up @@ -437,8 +478,10 @@ public List<KvPair> scan(ByteString startKey, ByteString endKey, int limit, bool
SlowLog slowLog =
new SlowLogImpl(
conf.getRawKVScanSlowLogInMS(),
new HashMap<String, String>(5) {
new HashMap<String, Object>() {
{
put("cluster_id", clusterId);
put("pd_addresses", pdAddresses);
put("func", "scan");
put("startKey", KeyUtils.formatBytesUTF8(startKey));
put("endKey", KeyUtils.formatBytesUTF8(endKey));
Expand Down Expand Up @@ -487,8 +530,10 @@ public List<KvPair> scan(ByteString startKey, ByteString endKey, boolean keyOnly
SlowLog slowLog =
new SlowLogImpl(
conf.getRawKVScanSlowLogInMS(),
new HashMap<String, String>(4) {
new HashMap<String, Object>() {
{
put("cluster_id", clusterId);
put("pd_addresses", pdAddresses);
put("func", "scan");
put("startKey", KeyUtils.formatBytesUTF8(startKey));
put("endKey", KeyUtils.formatBytesUTF8(endKey));
Expand Down Expand Up @@ -567,8 +612,10 @@ private void delete(ByteString key, boolean atomic) {
SlowLog slowLog =
new SlowLogImpl(
conf.getRawKVWriteSlowLogInMS(),
new HashMap<String, String>(3) {
new HashMap<String, Object>() {
{
put("cluster_id", clusterId);
put("pd_addresses", pdAddresses);
put("func", "delete");
put("key", KeyUtils.formatBytesUTF8(key));
put("atomic", String.valueOf(atomic));
Expand Down