diff --git a/src/main/java/org/tikv/common/PDClient.java b/src/main/java/org/tikv/common/PDClient.java index a9c4c5c9c84..4b8a283c40c 100644 --- a/src/main/java/org/tikv/common/PDClient.java +++ b/src/main/java/org/tikv/common/PDClient.java @@ -840,4 +840,12 @@ private Metapb.Region decodeRegion(Metapb.Region region) { return builder.build(); } + + public long getClusterId() { + return header.getClusterId(); + } + + public List getPdAddrs() { + return pdAddrs; + } } diff --git a/src/main/java/org/tikv/common/log/SlowLog.java b/src/main/java/org/tikv/common/log/SlowLog.java index 965266e8064..ad137100c01 100644 --- a/src/main/java/org/tikv/common/log/SlowLog.java +++ b/src/main/java/org/tikv/common/log/SlowLog.java @@ -17,7 +17,11 @@ package org.tikv.common.log; +import com.google.common.collect.ImmutableMap; +import java.util.Map; + public interface SlowLog { + SlowLogSpan start(String name); long getTraceId(); @@ -26,5 +30,11 @@ public interface SlowLog { void setError(Throwable err); + SlowLog withFields(Map fields); + + default SlowLog withField(String key, Object value) { + return withFields(ImmutableMap.of(key, value)); + } + void log(); } diff --git a/src/main/java/org/tikv/common/log/SlowLogEmptyImpl.java b/src/main/java/org/tikv/common/log/SlowLogEmptyImpl.java index a1959d96b32..ed4bbb1b834 100644 --- a/src/main/java/org/tikv/common/log/SlowLogEmptyImpl.java +++ b/src/main/java/org/tikv/common/log/SlowLogEmptyImpl.java @@ -17,6 +17,8 @@ package org.tikv.common.log; +import java.util.Map; + public class SlowLogEmptyImpl implements SlowLog { public static final SlowLogEmptyImpl INSTANCE = new SlowLogEmptyImpl(); @@ -40,6 +42,11 @@ public long getThresholdMS() { @Override public void setError(Throwable err) {} + @Override + public SlowLog withFields(Map fields) { + return this; + } + @Override public void log() {} } diff --git a/src/main/java/org/tikv/common/log/SlowLogImpl.java b/src/main/java/org/tikv/common/log/SlowLogImpl.java index c6efcf457a7..0c75211263f 100644 --- a/src/main/java/org/tikv/common/log/SlowLogImpl.java +++ b/src/main/java/org/tikv/common/log/SlowLogImpl.java @@ -22,12 +22,16 @@ import java.math.BigInteger; import java.text.SimpleDateFormat; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Map.Entry; import java.util.Random; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class SlowLogImpl implements SlowLog { + private static final Logger logger = LoggerFactory.getLogger(SlowLogImpl.class); private static final int MAX_SPAN_SIZE = 1024; @@ -35,6 +39,7 @@ public class SlowLogImpl implements SlowLog { private static final Random random = new Random(); private final List slowLogSpans = new ArrayList<>(); + private final HashMap fields = new HashMap<>(); private Throwable error = null; private final long startMS; @@ -81,6 +86,12 @@ public void setError(Throwable err) { this.error = err; } + @Override + public SlowLog withFields(Map fields) { + this.fields.putAll(fields); + return this; + } + @Override public void log() { recordTime(); @@ -120,6 +131,25 @@ JsonObject getSlowLogJson() { } jsonObject.add("spans", jsonArray); + for (Entry entry : fields.entrySet()) { + Object value = entry.getValue(); + if (value instanceof List) { + JsonArray field = new JsonArray(); + for (Object o : (List) value) { + field.add(o.toString()); + } + jsonObject.add(entry.getKey(), field); + } else if (value instanceof Map) { + JsonObject field = new JsonObject(); + for (Entry e : ((Map) value).entrySet()) { + field.addProperty(e.getKey().toString(), e.getValue().toString()); + } + jsonObject.add(entry.getKey(), field); + } else { + jsonObject.addProperty(entry.getKey(), value.toString()); + } + } + return jsonObject; } diff --git a/src/main/java/org/tikv/raw/RawKVClient.java b/src/main/java/org/tikv/raw/RawKVClient.java index 950f4774ddf..d4bbeebef78 100644 --- a/src/main/java/org/tikv/raw/RawKVClient.java +++ b/src/main/java/org/tikv/raw/RawKVClient.java @@ -17,13 +17,31 @@ package org.tikv.raw; -import static org.tikv.common.util.ClientUtils.*; +import static org.tikv.common.util.ClientUtils.appendBatches; +import static org.tikv.common.util.ClientUtils.genUUID; +import static org.tikv.common.util.ClientUtils.getBatches; +import static org.tikv.common.util.ClientUtils.getTasks; +import static org.tikv.common.util.ClientUtils.getTasksWithOutput; +import static org.tikv.common.util.ClientUtils.groupKeysByRegion; import com.google.protobuf.ByteString; import io.prometheus.client.Counter; import io.prometheus.client.Histogram; -import java.util.*; -import java.util.concurrent.*; +import java.net.URI; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Queue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,10 +62,19 @@ import org.tikv.common.region.RegionStoreClient; import org.tikv.common.region.RegionStoreClient.RegionStoreClientBuilder; import org.tikv.common.region.TiRegion; -import org.tikv.common.util.*; +import org.tikv.common.util.BackOffFunction; +import org.tikv.common.util.BackOffer; +import org.tikv.common.util.Batch; +import org.tikv.common.util.ConcreteBackOffer; +import org.tikv.common.util.DeleteRange; +import org.tikv.common.util.HistogramUtils; +import org.tikv.common.util.Pair; +import org.tikv.common.util.ScanOption; import org.tikv.kvproto.Kvrpcpb.KvPair; public class RawKVClient implements RawKVClientBase { + private final long clusterId; + private final List pdAddresses; private final TiSession tiSession; private final RegionStoreClientBuilder clientBuilder; private final TiConfiguration conf; @@ -95,6 +122,12 @@ public RawKVClient(TiSession session, RegionStoreClientBuilder clientBuilder) { this.batchScanThreadPool = session.getThreadPoolForBatchScan(); this.deleteRangeThreadPool = session.getThreadPoolForDeleteRange(); this.atomicForCAS = conf.isEnableAtomicForCAS(); + this.clusterId = session.getPDClient().getClusterId(); + this.pdAddresses = session.getPDClient().getPdAddrs(); + } + + private SlowLog withClusterInfo(SlowLog logger) { + return logger.withField("cluster_id", clusterId).withField("pd_addresses", pdAddresses); } @Override @@ -110,7 +143,7 @@ public void put(ByteString key, ByteString value, long ttl) { String label = "client_raw_put"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); - SlowLog slowLog = new SlowLogImpl(conf.getRawKVWriteSlowLogInMS()); + SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVWriteSlowLogInMS())); SlowLogSpan span = slowLog.start("put"); span.addProperty("key", KeyUtils.formatBytesUTF8(key)); @@ -172,7 +205,7 @@ public void compareAndSet( String label = "client_raw_compare_and_set"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); - SlowLog slowLog = new SlowLogImpl(conf.getRawKVWriteSlowLogInMS()); + SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVWriteSlowLogInMS())); SlowLogSpan span = slowLog.start("putIfAbsent"); span.addProperty("key", KeyUtils.formatBytesUTF8(key)); @@ -211,7 +244,7 @@ public void batchPut(Map kvPairs, long ttl) { String label = "client_raw_batch_put"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); - SlowLog slowLog = new SlowLogImpl(conf.getRawKVBatchWriteSlowLogInMS()); + SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVBatchWriteSlowLogInMS())); SlowLogSpan span = slowLog.start("batchPut"); span.addProperty("keySize", String.valueOf(kvPairs.size())); @@ -237,7 +270,7 @@ public Optional get(ByteString key) { String label = "client_raw_get"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); - SlowLog slowLog = new SlowLogImpl(conf.getRawKVReadSlowLogInMS()); + SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVReadSlowLogInMS())); SlowLogSpan span = slowLog.start("get"); span.addProperty("key", KeyUtils.formatBytesUTF8(key)); @@ -270,7 +303,7 @@ public Optional get(ByteString key) { public List batchGet(List keys) { String label = "client_raw_batch_get"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); - SlowLog slowLog = new SlowLogImpl(conf.getRawKVBatchReadSlowLogInMS()); + SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVBatchReadSlowLogInMS())); SlowLogSpan span = slowLog.start("batchGet"); span.addProperty("keySize", String.valueOf(keys.size())); ConcreteBackOffer backOffer = @@ -295,7 +328,7 @@ public List batchGet(List keys) { public void batchDelete(List keys) { String label = "client_raw_batch_delete"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); - SlowLog slowLog = new SlowLogImpl(conf.getRawKVBatchWriteSlowLogInMS()); + SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVBatchWriteSlowLogInMS())); SlowLogSpan span = slowLog.start("batchDelete"); span.addProperty("keySize", String.valueOf(keys.size())); ConcreteBackOffer backOffer = @@ -320,7 +353,7 @@ public void batchDelete(List keys) { public Optional getKeyTTL(ByteString key) { String label = "client_raw_get_key_ttl"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); - SlowLog slowLog = new SlowLogImpl(conf.getRawKVReadSlowLogInMS()); + SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVReadSlowLogInMS())); SlowLogSpan span = slowLog.start("getKeyTTL"); span.addProperty("key", KeyUtils.formatBytesUTF8(key)); ConcreteBackOffer backOffer = @@ -428,7 +461,7 @@ public List scan(ByteString startKey, ByteString endKey, int limit) { public List scan(ByteString startKey, ByteString endKey, int limit, boolean keyOnly) { String label = "client_raw_scan"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); - SlowLog slowLog = new SlowLogImpl(conf.getRawKVScanSlowLogInMS()); + SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVScanSlowLogInMS())); SlowLogSpan span = slowLog.start("scan"); span.addProperty("startKey", KeyUtils.formatBytesUTF8(startKey)); span.addProperty("endKey", KeyUtils.formatBytesUTF8(endKey)); @@ -473,7 +506,7 @@ public List scan(ByteString startKey, ByteString endKey) { public List scan(ByteString startKey, ByteString endKey, boolean keyOnly) { String label = "client_raw_scan_without_limit"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); - SlowLog slowLog = new SlowLogImpl(conf.getRawKVScanSlowLogInMS()); + SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVScanSlowLogInMS())); SlowLogSpan span = slowLog.start("scan"); span.addProperty("startKey", KeyUtils.formatBytesUTF8(startKey)); span.addProperty("endKey", KeyUtils.formatBytesUTF8(endKey)); @@ -539,7 +572,7 @@ public List scanPrefix(ByteString prefixKey, boolean keyOnly) { public void delete(ByteString key) { String label = "client_raw_delete"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); - SlowLog slowLog = new SlowLogImpl(conf.getRawKVWriteSlowLogInMS()); + SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVWriteSlowLogInMS())); SlowLogSpan span = slowLog.start("delete"); span.addProperty("key", KeyUtils.formatBytesUTF8(key)); ConcreteBackOffer backOffer = diff --git a/src/test/java/org/tikv/common/log/SlowLogImplTest.java b/src/test/java/org/tikv/common/log/SlowLogImplTest.java index 23f103575bc..f8b3ad2b307 100644 --- a/src/test/java/org/tikv/common/log/SlowLogImplTest.java +++ b/src/test/java/org/tikv/common/log/SlowLogImplTest.java @@ -17,8 +17,11 @@ package org.tikv.common.log; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.gson.JsonArray; import com.google.gson.JsonObject; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.Assert; import org.junit.Test; @@ -63,4 +66,24 @@ public void testUnsignedLong() { Assert.assertEquals("18446744073709551615", SlowLogImpl.toUnsignedBigInteger(-1L).toString()); Assert.assertEquals("18446744073709551614", SlowLogImpl.toUnsignedBigInteger(-2L).toString()); } + + @Test + public void testWithFields() throws InterruptedException { + SlowLogImpl slowLog = new SlowLogImpl(1); + slowLog + .withField("key0", "value0") + .withField("key1", ImmutableList.of("value0", "value1")) + .withField("key2", ImmutableMap.of("key3", "value3")); + + JsonObject object = slowLog.getSlowLogJson(); + Assert.assertEquals("value0", object.get("key0").getAsString()); + + AtomicInteger i = new AtomicInteger(); + object + .get("key1") + .getAsJsonArray() + .forEach(e -> Assert.assertEquals("value" + (i.getAndIncrement()), e.getAsString())); + + Assert.assertEquals("value3", object.get("key2").getAsJsonObject().get("key3").getAsString()); + } }