diff --git a/src/main/java/org/tikv/common/PDClient.java b/src/main/java/org/tikv/common/PDClient.java index 1072df902ab..d442ec66a07 100644 --- a/src/main/java/org/tikv/common/PDClient.java +++ b/src/main/java/org/tikv/common/PDClient.java @@ -744,4 +744,12 @@ private Metapb.Region decodeRegion(Metapb.Region region) { return builder.build(); } + + public long getClusterId() { + return header.getClusterId(); + } + + public List getPdAddrs() { + return pdAddrs; + } } diff --git a/src/main/java/org/tikv/common/log/SlowLogImpl.java b/src/main/java/org/tikv/common/log/SlowLogImpl.java index 2a437a854ac..7cd50779456 100644 --- a/src/main/java/org/tikv/common/log/SlowLogImpl.java +++ b/src/main/java/org/tikv/common/log/SlowLogImpl.java @@ -24,15 +24,18 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class SlowLogImpl implements SlowLog { + private static final Logger logger = LoggerFactory.getLogger(SlowLogImpl.class); private static final int MAX_SPAN_SIZE = 1024; private final List slowLogSpans = new ArrayList<>(); + private final HashMap fields = new HashMap<>(); private Throwable error = null; private final long startMS; @@ -44,15 +47,19 @@ public class SlowLogImpl implements SlowLog { private final long slowThresholdMS; /** Key-Value pairs which will be logged, e.g. function name, key, region, etc. */ - private final Map properties; + private final Map properties; - public SlowLogImpl(long slowThresholdMS, Map properties) { + public SlowLogImpl(long slowThresholdMS, Map properties) { this.startMS = System.currentTimeMillis(); this.startNS = System.nanoTime(); this.slowThresholdMS = slowThresholdMS; this.properties = new HashMap<>(properties); } + public SlowLogImpl(long slowThresholdMS) { + this(slowThresholdMS, new HashMap<>()); + } + @Override public void addProperty(String key, String value) { this.properties.put(key, value); @@ -93,8 +100,23 @@ private String getSlowLogString(long currentMS) { jsonObject.addProperty("error", error.getMessage()); } - for (Map.Entry entry : properties.entrySet()) { - jsonObject.addProperty(entry.getKey(), entry.getValue()); + for (Entry entry : properties.entrySet()) { + Object value = entry.getValue(); + if (value instanceof List) { + JsonArray field = new JsonArray(); + for (Object o : (List) value) { + field.add(o.toString()); + } + jsonObject.add(entry.getKey(), field); + } else if (value instanceof Map) { + JsonObject field = new JsonObject(); + for (Entry e : ((Map) value).entrySet()) { + field.addProperty(e.getKey().toString(), e.getValue().toString()); + } + jsonObject.add(entry.getKey(), field); + } else { + jsonObject.addProperty(entry.getKey(), value.toString()); + } } JsonArray jsonArray = new JsonArray(); diff --git a/src/main/java/org/tikv/raw/RawKVClient.java b/src/main/java/org/tikv/raw/RawKVClient.java index 419cb2f5f49..69dad0aff0b 100644 --- a/src/main/java/org/tikv/raw/RawKVClient.java +++ b/src/main/java/org/tikv/raw/RawKVClient.java @@ -15,13 +15,29 @@ package org.tikv.raw; -import static org.tikv.common.util.ClientUtils.*; +import static org.tikv.common.util.ClientUtils.appendBatches; +import static org.tikv.common.util.ClientUtils.getBatches; +import static org.tikv.common.util.ClientUtils.getTasks; +import static org.tikv.common.util.ClientUtils.getTasksWithOutput; +import static org.tikv.common.util.ClientUtils.groupKeysByRegion; import com.google.protobuf.ByteString; import io.prometheus.client.Counter; import io.prometheus.client.Histogram; -import java.util.*; -import java.util.concurrent.*; +import java.net.URI; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Queue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,10 +53,19 @@ import org.tikv.common.region.RegionStoreClient; import org.tikv.common.region.RegionStoreClient.RegionStoreClientBuilder; import org.tikv.common.region.TiRegion; -import org.tikv.common.util.*; +import org.tikv.common.util.BackOffFunction; +import org.tikv.common.util.BackOffer; +import org.tikv.common.util.Batch; +import org.tikv.common.util.ConcreteBackOffer; +import org.tikv.common.util.DeleteRange; +import org.tikv.common.util.HistogramUtils; +import org.tikv.common.util.Pair; +import org.tikv.common.util.ScanOption; import org.tikv.kvproto.Kvrpcpb.KvPair; public class RawKVClient implements RawKVClientBase { + private final Long clusterId; + private final List pdAddresses; private final RegionStoreClientBuilder clientBuilder; private final TiConfiguration conf; private final ExecutorService batchGetThreadPool; @@ -84,6 +109,8 @@ public RawKVClient(TiSession session, RegionStoreClientBuilder clientBuilder) { this.batchDeleteThreadPool = session.getThreadPoolForBatchDelete(); this.batchScanThreadPool = session.getThreadPoolForBatchScan(); this.deleteRangeThreadPool = session.getThreadPoolForDeleteRange(); + this.clusterId = session.getPDClient().getClusterId(); + this.pdAddresses = session.getPDClient().getPdAddrs(); } @Override @@ -110,8 +137,10 @@ private void put(ByteString key, ByteString value, long ttl, boolean atomic) { SlowLog slowLog = new SlowLogImpl( conf.getRawKVWriteSlowLogInMS(), - new HashMap(2) { + new HashMap() { { + put("cluster_id", clusterId); + put("pd_addresses", pdAddresses); put("func", "put"); put("key", KeyUtils.formatBytesUTF8(key)); } @@ -152,8 +181,10 @@ public ByteString putIfAbsent(ByteString key, ByteString value, long ttl) { SlowLog slowLog = new SlowLogImpl( conf.getRawKVWriteSlowLogInMS(), - new HashMap(2) { + new HashMap() { { + put("cluster_id", clusterId); + put("pd_addresses", pdAddresses); put("func", "putIfAbsent"); put("key", KeyUtils.formatBytesUTF8(key)); } @@ -208,8 +239,10 @@ private void batchPut(Map kvPairs, long ttl, boolean ato SlowLog slowLog = new SlowLogImpl( conf.getRawKVBatchWriteSlowLogInMS(), - new HashMap(2) { + new HashMap() { { + put("cluster_id", clusterId); + put("pd_addresses", pdAddresses); put("func", "batchPut"); put("keySize", String.valueOf(kvPairs.size())); } @@ -237,8 +270,10 @@ public ByteString get(ByteString key) { SlowLog slowLog = new SlowLogImpl( conf.getRawKVReadSlowLogInMS(), - new HashMap(2) { + new HashMap(2) { { + put("cluster_id", clusterId); + put("pd_addresses", pdAddresses); put("func", "get"); put("key", KeyUtils.formatBytesUTF8(key)); } @@ -275,8 +310,10 @@ public List batchGet(List keys) { SlowLog slowLog = new SlowLogImpl( conf.getRawKVBatchReadSlowLogInMS(), - new HashMap(2) { + new HashMap(2) { { + put("cluster_id", clusterId); + put("pd_addresses", pdAddresses); put("func", "batchGet"); put("keySize", String.valueOf(keys.size())); } @@ -314,8 +351,10 @@ private void batchDelete(List keys, boolean atomic) { SlowLog slowLog = new SlowLogImpl( conf.getRawKVBatchWriteSlowLogInMS(), - new HashMap(2) { + new HashMap() { { + put("cluster_id", clusterId); + put("pd_addresses", pdAddresses); put("func", "batchDelete"); put("keySize", String.valueOf(keys.size())); } @@ -344,8 +383,10 @@ public Long getKeyTTL(ByteString key) { SlowLog slowLog = new SlowLogImpl( conf.getRawKVReadSlowLogInMS(), - new HashMap(2) { + new HashMap() { { + put("cluster_id", clusterId); + put("pd_addresses", pdAddresses); put("func", "getKeyTTL"); put("key", KeyUtils.formatBytesUTF8(key)); } @@ -437,8 +478,10 @@ public List scan(ByteString startKey, ByteString endKey, int limit, bool SlowLog slowLog = new SlowLogImpl( conf.getRawKVScanSlowLogInMS(), - new HashMap(5) { + new HashMap() { { + put("cluster_id", clusterId); + put("pd_addresses", pdAddresses); put("func", "scan"); put("startKey", KeyUtils.formatBytesUTF8(startKey)); put("endKey", KeyUtils.formatBytesUTF8(endKey)); @@ -487,8 +530,10 @@ public List scan(ByteString startKey, ByteString endKey, boolean keyOnly SlowLog slowLog = new SlowLogImpl( conf.getRawKVScanSlowLogInMS(), - new HashMap(4) { + new HashMap() { { + put("cluster_id", clusterId); + put("pd_addresses", pdAddresses); put("func", "scan"); put("startKey", KeyUtils.formatBytesUTF8(startKey)); put("endKey", KeyUtils.formatBytesUTF8(endKey)); @@ -567,8 +612,10 @@ private void delete(ByteString key, boolean atomic) { SlowLog slowLog = new SlowLogImpl( conf.getRawKVWriteSlowLogInMS(), - new HashMap(3) { + new HashMap() { { + put("cluster_id", clusterId); + put("pd_addresses", pdAddresses); put("func", "delete"); put("key", KeyUtils.formatBytesUTF8(key)); put("atomic", String.valueOf(atomic));