From a019f76e5c7d0d6c08201733fe818e80347fcfad Mon Sep 17 00:00:00 2001 From: iosmanthus Date: Mon, 21 Mar 2022 17:39:18 +0800 Subject: [PATCH 1/2] cherry pick #557 to release-3.1 Signed-off-by: ti-srebot --- src/main/java/org/tikv/common/PDClient.java | 8 ++ .../java/org/tikv/common/log/SlowLog.java | 14 +++ .../org/tikv/common/log/SlowLogEmptyImpl.java | 7 ++ .../java/org/tikv/common/log/SlowLogImpl.java | 36 ++++++ src/main/java/org/tikv/raw/RawKVClient.java | 117 +++++++++++++++++- .../org/tikv/common/log/SlowLogImplTest.java | 89 +++++++++++++ 6 files changed, 267 insertions(+), 4 deletions(-) create mode 100644 src/test/java/org/tikv/common/log/SlowLogImplTest.java diff --git a/src/main/java/org/tikv/common/PDClient.java b/src/main/java/org/tikv/common/PDClient.java index 1072df902ab..d442ec66a07 100644 --- a/src/main/java/org/tikv/common/PDClient.java +++ b/src/main/java/org/tikv/common/PDClient.java @@ -744,4 +744,12 @@ private Metapb.Region decodeRegion(Metapb.Region region) { return builder.build(); } + + public long getClusterId() { + return header.getClusterId(); + } + + public List getPdAddrs() { + return pdAddrs; + } } diff --git a/src/main/java/org/tikv/common/log/SlowLog.java b/src/main/java/org/tikv/common/log/SlowLog.java index 007f3f1d736..bb5c1d12f66 100644 --- a/src/main/java/org/tikv/common/log/SlowLog.java +++ b/src/main/java/org/tikv/common/log/SlowLog.java @@ -17,12 +17,26 @@ package org.tikv.common.log; +import com.google.common.collect.ImmutableMap; +import java.util.Map; + public interface SlowLog { +<<<<<<< HEAD void addProperty(String key, String value); +======= + + SlowLogSpan start(String name); +>>>>>>> d354ffc99... [to #556] slowlog: attach cluster_id and pd_addresses to slow log properties (#557) SlowLogSpan start(String name); void setError(Throwable err); + SlowLog withFields(Map fields); + + default SlowLog withField(String key, Object value) { + return withFields(ImmutableMap.of(key, value)); + } + void log(); } diff --git a/src/main/java/org/tikv/common/log/SlowLogEmptyImpl.java b/src/main/java/org/tikv/common/log/SlowLogEmptyImpl.java index 5e76031f7c4..35299a0b12b 100644 --- a/src/main/java/org/tikv/common/log/SlowLogEmptyImpl.java +++ b/src/main/java/org/tikv/common/log/SlowLogEmptyImpl.java @@ -17,6 +17,8 @@ package org.tikv.common.log; +import java.util.Map; + public class SlowLogEmptyImpl implements SlowLog { public static final SlowLogEmptyImpl INSTANCE = new SlowLogEmptyImpl(); @@ -33,6 +35,11 @@ public SlowLogSpan start(String name) { @Override public void setError(Throwable err) {} + @Override + public SlowLog withFields(Map fields) { + return this; + } + @Override public void log() {} } diff --git a/src/main/java/org/tikv/common/log/SlowLogImpl.java b/src/main/java/org/tikv/common/log/SlowLogImpl.java index 2a437a854ac..a97093d8704 100644 --- a/src/main/java/org/tikv/common/log/SlowLogImpl.java +++ b/src/main/java/org/tikv/common/log/SlowLogImpl.java @@ -24,15 +24,22 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +<<<<<<< HEAD +======= +import java.util.Map.Entry; +import java.util.Random; +>>>>>>> d354ffc99... [to #556] slowlog: attach cluster_id and pd_addresses to slow log properties (#557) import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class SlowLogImpl implements SlowLog { + private static final Logger logger = LoggerFactory.getLogger(SlowLogImpl.class); private static final int MAX_SPAN_SIZE = 1024; private final List slowLogSpans = new ArrayList<>(); + private final HashMap fields = new HashMap<>(); private Throwable error = null; private final long startMS; @@ -73,6 +80,12 @@ public void setError(Throwable err) { this.error = err; } + @Override + public SlowLog withFields(Map fields) { + this.fields.putAll(fields); + return this; + } + @Override public void log() { long currentNS = System.nanoTime(); @@ -103,7 +116,30 @@ private String getSlowLogString(long currentMS) { } jsonObject.add("spans", jsonArray); +<<<<<<< HEAD return jsonObject.toString(); +======= + for (Entry entry : fields.entrySet()) { + Object value = entry.getValue(); + if (value instanceof List) { + JsonArray field = new JsonArray(); + for (Object o : (List) value) { + field.add(o.toString()); + } + jsonObject.add(entry.getKey(), field); + } else if (value instanceof Map) { + JsonObject field = new JsonObject(); + for (Entry e : ((Map) value).entrySet()) { + field.addProperty(e.getKey().toString(), e.getValue().toString()); + } + jsonObject.add(entry.getKey(), field); + } else { + jsonObject.addProperty(entry.getKey(), value.toString()); + } + } + + return jsonObject; +>>>>>>> d354ffc99... [to #556] slowlog: attach cluster_id and pd_addresses to slow log properties (#557) } public static SimpleDateFormat getSimpleDateFormat() { diff --git a/src/main/java/org/tikv/raw/RawKVClient.java b/src/main/java/org/tikv/raw/RawKVClient.java index 419cb2f5f49..cdab1361c6f 100644 --- a/src/main/java/org/tikv/raw/RawKVClient.java +++ b/src/main/java/org/tikv/raw/RawKVClient.java @@ -15,13 +15,31 @@ package org.tikv.raw; -import static org.tikv.common.util.ClientUtils.*; +import static org.tikv.common.util.ClientUtils.appendBatches; +import static org.tikv.common.util.ClientUtils.genUUID; +import static org.tikv.common.util.ClientUtils.getBatches; +import static org.tikv.common.util.ClientUtils.getTasks; +import static org.tikv.common.util.ClientUtils.getTasksWithOutput; +import static org.tikv.common.util.ClientUtils.groupKeysByRegion; import com.google.protobuf.ByteString; import io.prometheus.client.Counter; import io.prometheus.client.Histogram; -import java.util.*; -import java.util.concurrent.*; +import java.net.URI; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Queue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,10 +55,23 @@ import org.tikv.common.region.RegionStoreClient; import org.tikv.common.region.RegionStoreClient.RegionStoreClientBuilder; import org.tikv.common.region.TiRegion; -import org.tikv.common.util.*; +import org.tikv.common.util.BackOffFunction; +import org.tikv.common.util.BackOffer; +import org.tikv.common.util.Batch; +import org.tikv.common.util.ConcreteBackOffer; +import org.tikv.common.util.DeleteRange; +import org.tikv.common.util.HistogramUtils; +import org.tikv.common.util.Pair; +import org.tikv.common.util.ScanOption; import org.tikv.kvproto.Kvrpcpb.KvPair; public class RawKVClient implements RawKVClientBase { +<<<<<<< HEAD +======= + private final long clusterId; + private final List pdAddresses; + private final TiSession tiSession; +>>>>>>> d354ffc99... [to #556] slowlog: attach cluster_id and pd_addresses to slow log properties (#557) private final RegionStoreClientBuilder clientBuilder; private final TiConfiguration conf; private final ExecutorService batchGetThreadPool; @@ -84,6 +115,16 @@ public RawKVClient(TiSession session, RegionStoreClientBuilder clientBuilder) { this.batchDeleteThreadPool = session.getThreadPoolForBatchDelete(); this.batchScanThreadPool = session.getThreadPoolForBatchScan(); this.deleteRangeThreadPool = session.getThreadPoolForDeleteRange(); +<<<<<<< HEAD +======= + this.atomicForCAS = conf.isEnableAtomicForCAS(); + this.clusterId = session.getPDClient().getClusterId(); + this.pdAddresses = session.getPDClient().getPdAddrs(); + } + + private SlowLog withClusterInfo(SlowLog logger) { + return logger.withField("cluster_id", clusterId).withField("pd_addresses", pdAddresses); +>>>>>>> d354ffc99... [to #556] slowlog: attach cluster_id and pd_addresses to slow log properties (#557) } @Override @@ -99,10 +140,16 @@ public void put(ByteString key, ByteString value, long ttl) { put(key, value, ttl, false); } +<<<<<<< HEAD @Override public void putAtomic(ByteString key, ByteString value, long ttl) { put(key, value, ttl, true); } +======= + SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVWriteSlowLogInMS())); + SlowLogSpan span = slowLog.start("put"); + span.addProperty("key", KeyUtils.formatBytesUTF8(key)); +>>>>>>> d354ffc99... [to #556] slowlog: attach cluster_id and pd_addresses to slow log properties (#557) private void put(ByteString key, ByteString value, long ttl, boolean atomic) { String label = "client_raw_put"; @@ -149,6 +196,7 @@ public ByteString putIfAbsent(ByteString key, ByteString value) { public ByteString putIfAbsent(ByteString key, ByteString value, long ttl) { String label = "client_raw_put_if_absent"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); +<<<<<<< HEAD SlowLog slowLog = new SlowLogImpl( conf.getRawKVWriteSlowLogInMS(), @@ -158,6 +206,13 @@ public ByteString putIfAbsent(ByteString key, ByteString value, long ttl) { put("key", KeyUtils.formatBytesUTF8(key)); } }); +======= + + SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVWriteSlowLogInMS())); + SlowLogSpan span = slowLog.start("putIfAbsent"); + span.addProperty("key", KeyUtils.formatBytesUTF8(key)); + +>>>>>>> d354ffc99... [to #556] slowlog: attach cluster_id and pd_addresses to slow log properties (#557) ConcreteBackOffer backOffer = ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVWriteTimeoutInMS(), slowLog); try { @@ -192,6 +247,7 @@ public void batchPut(Map kvPairs, long ttl) { batchPut(kvPairs, ttl, false); } +<<<<<<< HEAD @Override public void batchPutAtomic(Map kvPairs) { batchPutAtomic(kvPairs, 0); @@ -201,6 +257,11 @@ public void batchPutAtomic(Map kvPairs) { public void batchPutAtomic(Map kvPairs, long ttl) { batchPut(kvPairs, ttl, true); } +======= + SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVBatchWriteSlowLogInMS())); + SlowLogSpan span = slowLog.start("batchPut"); + span.addProperty("keySize", String.valueOf(kvPairs.size())); +>>>>>>> d354ffc99... [to #556] slowlog: attach cluster_id and pd_addresses to slow log properties (#557) private void batchPut(Map kvPairs, long ttl, boolean atomic) { String label = "client_raw_batch_put"; @@ -234,6 +295,7 @@ private void batchPut(Map kvPairs, long ttl, boolean ato public ByteString get(ByteString key) { String label = "client_raw_get"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); +<<<<<<< HEAD SlowLog slowLog = new SlowLogImpl( conf.getRawKVReadSlowLogInMS(), @@ -243,6 +305,12 @@ public ByteString get(ByteString key) { put("key", KeyUtils.formatBytesUTF8(key)); } }); +======= + + SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVReadSlowLogInMS())); + SlowLogSpan span = slowLog.start("get"); + span.addProperty("key", KeyUtils.formatBytesUTF8(key)); +>>>>>>> d354ffc99... [to #556] slowlog: attach cluster_id and pd_addresses to slow log properties (#557) ConcreteBackOffer backOffer = ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVReadTimeoutInMS(), slowLog); @@ -272,6 +340,7 @@ public ByteString get(ByteString key) { public List batchGet(List keys) { String label = "client_raw_batch_get"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); +<<<<<<< HEAD SlowLog slowLog = new SlowLogImpl( conf.getRawKVBatchReadSlowLogInMS(), @@ -281,6 +350,11 @@ public List batchGet(List keys) { put("keySize", String.valueOf(keys.size())); } }); +======= + SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVBatchReadSlowLogInMS())); + SlowLogSpan span = slowLog.start("batchGet"); + span.addProperty("keySize", String.valueOf(keys.size())); +>>>>>>> d354ffc99... [to #556] slowlog: attach cluster_id and pd_addresses to slow log properties (#557) ConcreteBackOffer backOffer = ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVBatchReadTimeoutInMS(), slowLog); try { @@ -311,6 +385,7 @@ public void batchDeleteAtomic(List keys) { private void batchDelete(List keys, boolean atomic) { String label = "client_raw_batch_delete"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); +<<<<<<< HEAD SlowLog slowLog = new SlowLogImpl( conf.getRawKVBatchWriteSlowLogInMS(), @@ -320,6 +395,11 @@ private void batchDelete(List keys, boolean atomic) { put("keySize", String.valueOf(keys.size())); } }); +======= + SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVBatchWriteSlowLogInMS())); + SlowLogSpan span = slowLog.start("batchDelete"); + span.addProperty("keySize", String.valueOf(keys.size())); +>>>>>>> d354ffc99... [to #556] slowlog: attach cluster_id and pd_addresses to slow log properties (#557) ConcreteBackOffer backOffer = ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVBatchWriteTimeoutInMS(), slowLog); try { @@ -341,6 +421,7 @@ private void batchDelete(List keys, boolean atomic) { public Long getKeyTTL(ByteString key) { String label = "client_raw_get_key_ttl"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); +<<<<<<< HEAD SlowLog slowLog = new SlowLogImpl( conf.getRawKVReadSlowLogInMS(), @@ -350,6 +431,11 @@ public Long getKeyTTL(ByteString key) { put("key", KeyUtils.formatBytesUTF8(key)); } }); +======= + SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVReadSlowLogInMS())); + SlowLogSpan span = slowLog.start("getKeyTTL"); + span.addProperty("key", KeyUtils.formatBytesUTF8(key)); +>>>>>>> d354ffc99... [to #556] slowlog: attach cluster_id and pd_addresses to slow log properties (#557) ConcreteBackOffer backOffer = ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVReadTimeoutInMS(), slowLog); try { @@ -434,6 +520,7 @@ public List scan(ByteString startKey, ByteString endKey, int limit) { public List scan(ByteString startKey, ByteString endKey, int limit, boolean keyOnly) { String label = "client_raw_scan"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); +<<<<<<< HEAD SlowLog slowLog = new SlowLogImpl( conf.getRawKVScanSlowLogInMS(), @@ -446,6 +533,14 @@ public List scan(ByteString startKey, ByteString endKey, int limit, bool put("keyOnly", String.valueOf(keyOnly)); } }); +======= + SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVScanSlowLogInMS())); + SlowLogSpan span = slowLog.start("scan"); + span.addProperty("startKey", KeyUtils.formatBytesUTF8(startKey)); + span.addProperty("endKey", KeyUtils.formatBytesUTF8(endKey)); + span.addProperty("limit", String.valueOf(limit)); + span.addProperty("keyOnly", String.valueOf(keyOnly)); +>>>>>>> d354ffc99... [to #556] slowlog: attach cluster_id and pd_addresses to slow log properties (#557) ConcreteBackOffer backOffer = ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVScanTimeoutInMS(), slowLog); try { @@ -484,6 +579,7 @@ public List scan(ByteString startKey, ByteString endKey) { public List scan(ByteString startKey, ByteString endKey, boolean keyOnly) { String label = "client_raw_scan_without_limit"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); +<<<<<<< HEAD SlowLog slowLog = new SlowLogImpl( conf.getRawKVScanSlowLogInMS(), @@ -495,6 +591,13 @@ public List scan(ByteString startKey, ByteString endKey, boolean keyOnly put("keyOnly", String.valueOf(keyOnly)); } }); +======= + SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVScanSlowLogInMS())); + SlowLogSpan span = slowLog.start("scan"); + span.addProperty("startKey", KeyUtils.formatBytesUTF8(startKey)); + span.addProperty("endKey", KeyUtils.formatBytesUTF8(endKey)); + span.addProperty("keyOnly", String.valueOf(keyOnly)); +>>>>>>> d354ffc99... [to #556] slowlog: attach cluster_id and pd_addresses to slow log properties (#557) ConcreteBackOffer backOffer = ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVScanTimeoutInMS(), slowLog); try { @@ -564,6 +667,7 @@ public void deleteAtomic(ByteString key) { private void delete(ByteString key, boolean atomic) { String label = "client_raw_delete"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); +<<<<<<< HEAD SlowLog slowLog = new SlowLogImpl( conf.getRawKVWriteSlowLogInMS(), @@ -574,6 +678,11 @@ private void delete(ByteString key, boolean atomic) { put("atomic", String.valueOf(atomic)); } }); +======= + SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVWriteSlowLogInMS())); + SlowLogSpan span = slowLog.start("delete"); + span.addProperty("key", KeyUtils.formatBytesUTF8(key)); +>>>>>>> d354ffc99... [to #556] slowlog: attach cluster_id and pd_addresses to slow log properties (#557) ConcreteBackOffer backOffer = ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVWriteTimeoutInMS(), slowLog); try { diff --git a/src/test/java/org/tikv/common/log/SlowLogImplTest.java b/src/test/java/org/tikv/common/log/SlowLogImplTest.java new file mode 100644 index 00000000000..f8b3ad2b307 --- /dev/null +++ b/src/test/java/org/tikv/common/log/SlowLogImplTest.java @@ -0,0 +1,89 @@ +/* + * Copyright 2022 TiKV Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.tikv.common.log; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.gson.JsonArray; +import com.google.gson.JsonObject; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.Assert; +import org.junit.Test; + +public class SlowLogImplTest { + + @Test + public void testThresholdTime() throws InterruptedException { + SlowLogImpl slowLog = new SlowLogImpl(1000); + Thread.sleep(1100); + slowLog.log(); + Assert.assertTrue(slowLog.timeExceeded()); + + slowLog = new SlowLogImpl(1000); + Thread.sleep(500); + slowLog.log(); + Assert.assertFalse(slowLog.timeExceeded()); + + slowLog = new SlowLogImpl(-1); + Thread.sleep(500); + slowLog.log(); + Assert.assertFalse(slowLog.timeExceeded()); + } + + @Test + public void testSlowLogJson() throws InterruptedException { + SlowLogImpl slowLog = new SlowLogImpl(1); + SlowLogSpan span = slowLog.start("method1"); + Thread.sleep(500); + span.end(); + JsonObject object = slowLog.getSlowLogJson(); + + JsonArray spans = object.get("spans").getAsJsonArray(); + Assert.assertEquals(1, spans.size()); + JsonObject spanObject = spans.get(0).getAsJsonObject(); + Assert.assertEquals("method1", spanObject.get("event").getAsString()); + Assert.assertTrue(spanObject.get("duration_ms").getAsLong() >= 500); + } + + @Test + public void testUnsignedLong() { + Assert.assertEquals("12345", SlowLogImpl.toUnsignedBigInteger(12345L).toString()); + Assert.assertEquals("18446744073709551615", SlowLogImpl.toUnsignedBigInteger(-1L).toString()); + Assert.assertEquals("18446744073709551614", SlowLogImpl.toUnsignedBigInteger(-2L).toString()); + } + + @Test + public void testWithFields() throws InterruptedException { + SlowLogImpl slowLog = new SlowLogImpl(1); + slowLog + .withField("key0", "value0") + .withField("key1", ImmutableList.of("value0", "value1")) + .withField("key2", ImmutableMap.of("key3", "value3")); + + JsonObject object = slowLog.getSlowLogJson(); + Assert.assertEquals("value0", object.get("key0").getAsString()); + + AtomicInteger i = new AtomicInteger(); + object + .get("key1") + .getAsJsonArray() + .forEach(e -> Assert.assertEquals("value" + (i.getAndIncrement()), e.getAsString())); + + Assert.assertEquals("value3", object.get("key2").getAsJsonObject().get("key3").getAsString()); + } +} From 8520253d7a37e04235eb02127adf86408a527e65 Mon Sep 17 00:00:00 2001 From: iosmanthus Date: Tue, 22 Mar 2022 18:08:39 +0800 Subject: [PATCH 2/2] rawkv: fix conflict with old slow interface Signed-off-by: iosmanthus --- .../java/org/tikv/common/log/SlowLog.java | 14 -- .../org/tikv/common/log/SlowLogEmptyImpl.java | 7 - .../java/org/tikv/common/log/SlowLogImpl.java | 42 ++---- src/main/java/org/tikv/raw/RawKVClient.java | 124 +++++------------- .../org/tikv/common/log/SlowLogImplTest.java | 89 ------------- 5 files changed, 45 insertions(+), 231 deletions(-) delete mode 100644 src/test/java/org/tikv/common/log/SlowLogImplTest.java diff --git a/src/main/java/org/tikv/common/log/SlowLog.java b/src/main/java/org/tikv/common/log/SlowLog.java index bb5c1d12f66..007f3f1d736 100644 --- a/src/main/java/org/tikv/common/log/SlowLog.java +++ b/src/main/java/org/tikv/common/log/SlowLog.java @@ -17,26 +17,12 @@ package org.tikv.common.log; -import com.google.common.collect.ImmutableMap; -import java.util.Map; - public interface SlowLog { -<<<<<<< HEAD void addProperty(String key, String value); -======= - - SlowLogSpan start(String name); ->>>>>>> d354ffc99... [to #556] slowlog: attach cluster_id and pd_addresses to slow log properties (#557) SlowLogSpan start(String name); void setError(Throwable err); - SlowLog withFields(Map fields); - - default SlowLog withField(String key, Object value) { - return withFields(ImmutableMap.of(key, value)); - } - void log(); } diff --git a/src/main/java/org/tikv/common/log/SlowLogEmptyImpl.java b/src/main/java/org/tikv/common/log/SlowLogEmptyImpl.java index 35299a0b12b..5e76031f7c4 100644 --- a/src/main/java/org/tikv/common/log/SlowLogEmptyImpl.java +++ b/src/main/java/org/tikv/common/log/SlowLogEmptyImpl.java @@ -17,8 +17,6 @@ package org.tikv.common.log; -import java.util.Map; - public class SlowLogEmptyImpl implements SlowLog { public static final SlowLogEmptyImpl INSTANCE = new SlowLogEmptyImpl(); @@ -35,11 +33,6 @@ public SlowLogSpan start(String name) { @Override public void setError(Throwable err) {} - @Override - public SlowLog withFields(Map fields) { - return this; - } - @Override public void log() {} } diff --git a/src/main/java/org/tikv/common/log/SlowLogImpl.java b/src/main/java/org/tikv/common/log/SlowLogImpl.java index a97093d8704..7cd50779456 100644 --- a/src/main/java/org/tikv/common/log/SlowLogImpl.java +++ b/src/main/java/org/tikv/common/log/SlowLogImpl.java @@ -24,11 +24,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -<<<<<<< HEAD -======= import java.util.Map.Entry; -import java.util.Random; ->>>>>>> d354ffc99... [to #556] slowlog: attach cluster_id and pd_addresses to slow log properties (#557) import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,15 +47,19 @@ public class SlowLogImpl implements SlowLog { private final long slowThresholdMS; /** Key-Value pairs which will be logged, e.g. function name, key, region, etc. */ - private final Map properties; + private final Map properties; - public SlowLogImpl(long slowThresholdMS, Map properties) { + public SlowLogImpl(long slowThresholdMS, Map properties) { this.startMS = System.currentTimeMillis(); this.startNS = System.nanoTime(); this.slowThresholdMS = slowThresholdMS; this.properties = new HashMap<>(properties); } + public SlowLogImpl(long slowThresholdMS) { + this(slowThresholdMS, new HashMap<>()); + } + @Override public void addProperty(String key, String value) { this.properties.put(key, value); @@ -80,12 +80,6 @@ public void setError(Throwable err) { this.error = err; } - @Override - public SlowLog withFields(Map fields) { - this.fields.putAll(fields); - return this; - } - @Override public void log() { long currentNS = System.nanoTime(); @@ -106,20 +100,7 @@ private String getSlowLogString(long currentMS) { jsonObject.addProperty("error", error.getMessage()); } - for (Map.Entry entry : properties.entrySet()) { - jsonObject.addProperty(entry.getKey(), entry.getValue()); - } - - JsonArray jsonArray = new JsonArray(); - for (SlowLogSpan slowLogSpan : slowLogSpans) { - jsonArray.add(slowLogSpan.toJsonElement()); - } - jsonObject.add("spans", jsonArray); - -<<<<<<< HEAD - return jsonObject.toString(); -======= - for (Entry entry : fields.entrySet()) { + for (Entry entry : properties.entrySet()) { Object value = entry.getValue(); if (value instanceof List) { JsonArray field = new JsonArray(); @@ -138,8 +119,13 @@ private String getSlowLogString(long currentMS) { } } - return jsonObject; ->>>>>>> d354ffc99... [to #556] slowlog: attach cluster_id and pd_addresses to slow log properties (#557) + JsonArray jsonArray = new JsonArray(); + for (SlowLogSpan slowLogSpan : slowLogSpans) { + jsonArray.add(slowLogSpan.toJsonElement()); + } + jsonObject.add("spans", jsonArray); + + return jsonObject.toString(); } public static SimpleDateFormat getSimpleDateFormat() { diff --git a/src/main/java/org/tikv/raw/RawKVClient.java b/src/main/java/org/tikv/raw/RawKVClient.java index cdab1361c6f..69dad0aff0b 100644 --- a/src/main/java/org/tikv/raw/RawKVClient.java +++ b/src/main/java/org/tikv/raw/RawKVClient.java @@ -16,7 +16,6 @@ package org.tikv.raw; import static org.tikv.common.util.ClientUtils.appendBatches; -import static org.tikv.common.util.ClientUtils.genUUID; import static org.tikv.common.util.ClientUtils.getBatches; import static org.tikv.common.util.ClientUtils.getTasks; import static org.tikv.common.util.ClientUtils.getTasksWithOutput; @@ -33,7 +32,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Optional; import java.util.Queue; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; @@ -66,12 +64,8 @@ import org.tikv.kvproto.Kvrpcpb.KvPair; public class RawKVClient implements RawKVClientBase { -<<<<<<< HEAD -======= - private final long clusterId; + private final Long clusterId; private final List pdAddresses; - private final TiSession tiSession; ->>>>>>> d354ffc99... [to #556] slowlog: attach cluster_id and pd_addresses to slow log properties (#557) private final RegionStoreClientBuilder clientBuilder; private final TiConfiguration conf; private final ExecutorService batchGetThreadPool; @@ -115,18 +109,10 @@ public RawKVClient(TiSession session, RegionStoreClientBuilder clientBuilder) { this.batchDeleteThreadPool = session.getThreadPoolForBatchDelete(); this.batchScanThreadPool = session.getThreadPoolForBatchScan(); this.deleteRangeThreadPool = session.getThreadPoolForDeleteRange(); -<<<<<<< HEAD -======= - this.atomicForCAS = conf.isEnableAtomicForCAS(); this.clusterId = session.getPDClient().getClusterId(); this.pdAddresses = session.getPDClient().getPdAddrs(); } - private SlowLog withClusterInfo(SlowLog logger) { - return logger.withField("cluster_id", clusterId).withField("pd_addresses", pdAddresses); ->>>>>>> d354ffc99... [to #556] slowlog: attach cluster_id and pd_addresses to slow log properties (#557) - } - @Override public void close() {} @@ -140,16 +126,10 @@ public void put(ByteString key, ByteString value, long ttl) { put(key, value, ttl, false); } -<<<<<<< HEAD @Override public void putAtomic(ByteString key, ByteString value, long ttl) { put(key, value, ttl, true); } -======= - SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVWriteSlowLogInMS())); - SlowLogSpan span = slowLog.start("put"); - span.addProperty("key", KeyUtils.formatBytesUTF8(key)); ->>>>>>> d354ffc99... [to #556] slowlog: attach cluster_id and pd_addresses to slow log properties (#557) private void put(ByteString key, ByteString value, long ttl, boolean atomic) { String label = "client_raw_put"; @@ -157,8 +137,10 @@ private void put(ByteString key, ByteString value, long ttl, boolean atomic) { SlowLog slowLog = new SlowLogImpl( conf.getRawKVWriteSlowLogInMS(), - new HashMap(2) { + new HashMap() { { + put("cluster_id", clusterId); + put("pd_addresses", pdAddresses); put("func", "put"); put("key", KeyUtils.formatBytesUTF8(key)); } @@ -196,23 +178,17 @@ public ByteString putIfAbsent(ByteString key, ByteString value) { public ByteString putIfAbsent(ByteString key, ByteString value, long ttl) { String label = "client_raw_put_if_absent"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); -<<<<<<< HEAD SlowLog slowLog = new SlowLogImpl( conf.getRawKVWriteSlowLogInMS(), - new HashMap(2) { + new HashMap() { { + put("cluster_id", clusterId); + put("pd_addresses", pdAddresses); put("func", "putIfAbsent"); put("key", KeyUtils.formatBytesUTF8(key)); } }); -======= - - SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVWriteSlowLogInMS())); - SlowLogSpan span = slowLog.start("putIfAbsent"); - span.addProperty("key", KeyUtils.formatBytesUTF8(key)); - ->>>>>>> d354ffc99... [to #556] slowlog: attach cluster_id and pd_addresses to slow log properties (#557) ConcreteBackOffer backOffer = ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVWriteTimeoutInMS(), slowLog); try { @@ -247,7 +223,6 @@ public void batchPut(Map kvPairs, long ttl) { batchPut(kvPairs, ttl, false); } -<<<<<<< HEAD @Override public void batchPutAtomic(Map kvPairs) { batchPutAtomic(kvPairs, 0); @@ -257,11 +232,6 @@ public void batchPutAtomic(Map kvPairs) { public void batchPutAtomic(Map kvPairs, long ttl) { batchPut(kvPairs, ttl, true); } -======= - SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVBatchWriteSlowLogInMS())); - SlowLogSpan span = slowLog.start("batchPut"); - span.addProperty("keySize", String.valueOf(kvPairs.size())); ->>>>>>> d354ffc99... [to #556] slowlog: attach cluster_id and pd_addresses to slow log properties (#557) private void batchPut(Map kvPairs, long ttl, boolean atomic) { String label = "client_raw_batch_put"; @@ -269,8 +239,10 @@ private void batchPut(Map kvPairs, long ttl, boolean ato SlowLog slowLog = new SlowLogImpl( conf.getRawKVBatchWriteSlowLogInMS(), - new HashMap(2) { + new HashMap() { { + put("cluster_id", clusterId); + put("pd_addresses", pdAddresses); put("func", "batchPut"); put("keySize", String.valueOf(kvPairs.size())); } @@ -295,22 +267,17 @@ private void batchPut(Map kvPairs, long ttl, boolean ato public ByteString get(ByteString key) { String label = "client_raw_get"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); -<<<<<<< HEAD SlowLog slowLog = new SlowLogImpl( conf.getRawKVReadSlowLogInMS(), - new HashMap(2) { + new HashMap(2) { { + put("cluster_id", clusterId); + put("pd_addresses", pdAddresses); put("func", "get"); put("key", KeyUtils.formatBytesUTF8(key)); } }); -======= - - SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVReadSlowLogInMS())); - SlowLogSpan span = slowLog.start("get"); - span.addProperty("key", KeyUtils.formatBytesUTF8(key)); ->>>>>>> d354ffc99... [to #556] slowlog: attach cluster_id and pd_addresses to slow log properties (#557) ConcreteBackOffer backOffer = ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVReadTimeoutInMS(), slowLog); @@ -340,21 +307,17 @@ public ByteString get(ByteString key) { public List batchGet(List keys) { String label = "client_raw_batch_get"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); -<<<<<<< HEAD SlowLog slowLog = new SlowLogImpl( conf.getRawKVBatchReadSlowLogInMS(), - new HashMap(2) { + new HashMap(2) { { + put("cluster_id", clusterId); + put("pd_addresses", pdAddresses); put("func", "batchGet"); put("keySize", String.valueOf(keys.size())); } }); -======= - SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVBatchReadSlowLogInMS())); - SlowLogSpan span = slowLog.start("batchGet"); - span.addProperty("keySize", String.valueOf(keys.size())); ->>>>>>> d354ffc99... [to #556] slowlog: attach cluster_id and pd_addresses to slow log properties (#557) ConcreteBackOffer backOffer = ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVBatchReadTimeoutInMS(), slowLog); try { @@ -385,21 +348,17 @@ public void batchDeleteAtomic(List keys) { private void batchDelete(List keys, boolean atomic) { String label = "client_raw_batch_delete"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); -<<<<<<< HEAD SlowLog slowLog = new SlowLogImpl( conf.getRawKVBatchWriteSlowLogInMS(), - new HashMap(2) { + new HashMap() { { + put("cluster_id", clusterId); + put("pd_addresses", pdAddresses); put("func", "batchDelete"); put("keySize", String.valueOf(keys.size())); } }); -======= - SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVBatchWriteSlowLogInMS())); - SlowLogSpan span = slowLog.start("batchDelete"); - span.addProperty("keySize", String.valueOf(keys.size())); ->>>>>>> d354ffc99... [to #556] slowlog: attach cluster_id and pd_addresses to slow log properties (#557) ConcreteBackOffer backOffer = ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVBatchWriteTimeoutInMS(), slowLog); try { @@ -421,21 +380,17 @@ private void batchDelete(List keys, boolean atomic) { public Long getKeyTTL(ByteString key) { String label = "client_raw_get_key_ttl"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); -<<<<<<< HEAD SlowLog slowLog = new SlowLogImpl( conf.getRawKVReadSlowLogInMS(), - new HashMap(2) { + new HashMap() { { + put("cluster_id", clusterId); + put("pd_addresses", pdAddresses); put("func", "getKeyTTL"); put("key", KeyUtils.formatBytesUTF8(key)); } }); -======= - SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVReadSlowLogInMS())); - SlowLogSpan span = slowLog.start("getKeyTTL"); - span.addProperty("key", KeyUtils.formatBytesUTF8(key)); ->>>>>>> d354ffc99... [to #556] slowlog: attach cluster_id and pd_addresses to slow log properties (#557) ConcreteBackOffer backOffer = ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVReadTimeoutInMS(), slowLog); try { @@ -520,12 +475,13 @@ public List scan(ByteString startKey, ByteString endKey, int limit) { public List scan(ByteString startKey, ByteString endKey, int limit, boolean keyOnly) { String label = "client_raw_scan"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); -<<<<<<< HEAD SlowLog slowLog = new SlowLogImpl( conf.getRawKVScanSlowLogInMS(), - new HashMap(5) { + new HashMap() { { + put("cluster_id", clusterId); + put("pd_addresses", pdAddresses); put("func", "scan"); put("startKey", KeyUtils.formatBytesUTF8(startKey)); put("endKey", KeyUtils.formatBytesUTF8(endKey)); @@ -533,14 +489,6 @@ public List scan(ByteString startKey, ByteString endKey, int limit, bool put("keyOnly", String.valueOf(keyOnly)); } }); -======= - SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVScanSlowLogInMS())); - SlowLogSpan span = slowLog.start("scan"); - span.addProperty("startKey", KeyUtils.formatBytesUTF8(startKey)); - span.addProperty("endKey", KeyUtils.formatBytesUTF8(endKey)); - span.addProperty("limit", String.valueOf(limit)); - span.addProperty("keyOnly", String.valueOf(keyOnly)); ->>>>>>> d354ffc99... [to #556] slowlog: attach cluster_id and pd_addresses to slow log properties (#557) ConcreteBackOffer backOffer = ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVScanTimeoutInMS(), slowLog); try { @@ -579,25 +527,19 @@ public List scan(ByteString startKey, ByteString endKey) { public List scan(ByteString startKey, ByteString endKey, boolean keyOnly) { String label = "client_raw_scan_without_limit"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); -<<<<<<< HEAD SlowLog slowLog = new SlowLogImpl( conf.getRawKVScanSlowLogInMS(), - new HashMap(4) { + new HashMap() { { + put("cluster_id", clusterId); + put("pd_addresses", pdAddresses); put("func", "scan"); put("startKey", KeyUtils.formatBytesUTF8(startKey)); put("endKey", KeyUtils.formatBytesUTF8(endKey)); put("keyOnly", String.valueOf(keyOnly)); } }); -======= - SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVScanSlowLogInMS())); - SlowLogSpan span = slowLog.start("scan"); - span.addProperty("startKey", KeyUtils.formatBytesUTF8(startKey)); - span.addProperty("endKey", KeyUtils.formatBytesUTF8(endKey)); - span.addProperty("keyOnly", String.valueOf(keyOnly)); ->>>>>>> d354ffc99... [to #556] slowlog: attach cluster_id and pd_addresses to slow log properties (#557) ConcreteBackOffer backOffer = ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVScanTimeoutInMS(), slowLog); try { @@ -667,22 +609,18 @@ public void deleteAtomic(ByteString key) { private void delete(ByteString key, boolean atomic) { String label = "client_raw_delete"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); -<<<<<<< HEAD SlowLog slowLog = new SlowLogImpl( conf.getRawKVWriteSlowLogInMS(), - new HashMap(3) { + new HashMap() { { + put("cluster_id", clusterId); + put("pd_addresses", pdAddresses); put("func", "delete"); put("key", KeyUtils.formatBytesUTF8(key)); put("atomic", String.valueOf(atomic)); } }); -======= - SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVWriteSlowLogInMS())); - SlowLogSpan span = slowLog.start("delete"); - span.addProperty("key", KeyUtils.formatBytesUTF8(key)); ->>>>>>> d354ffc99... [to #556] slowlog: attach cluster_id and pd_addresses to slow log properties (#557) ConcreteBackOffer backOffer = ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVWriteTimeoutInMS(), slowLog); try { diff --git a/src/test/java/org/tikv/common/log/SlowLogImplTest.java b/src/test/java/org/tikv/common/log/SlowLogImplTest.java deleted file mode 100644 index f8b3ad2b307..00000000000 --- a/src/test/java/org/tikv/common/log/SlowLogImplTest.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Copyright 2022 TiKV Project Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.tikv.common.log; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.gson.JsonArray; -import com.google.gson.JsonObject; -import java.util.concurrent.atomic.AtomicInteger; -import org.junit.Assert; -import org.junit.Test; - -public class SlowLogImplTest { - - @Test - public void testThresholdTime() throws InterruptedException { - SlowLogImpl slowLog = new SlowLogImpl(1000); - Thread.sleep(1100); - slowLog.log(); - Assert.assertTrue(slowLog.timeExceeded()); - - slowLog = new SlowLogImpl(1000); - Thread.sleep(500); - slowLog.log(); - Assert.assertFalse(slowLog.timeExceeded()); - - slowLog = new SlowLogImpl(-1); - Thread.sleep(500); - slowLog.log(); - Assert.assertFalse(slowLog.timeExceeded()); - } - - @Test - public void testSlowLogJson() throws InterruptedException { - SlowLogImpl slowLog = new SlowLogImpl(1); - SlowLogSpan span = slowLog.start("method1"); - Thread.sleep(500); - span.end(); - JsonObject object = slowLog.getSlowLogJson(); - - JsonArray spans = object.get("spans").getAsJsonArray(); - Assert.assertEquals(1, spans.size()); - JsonObject spanObject = spans.get(0).getAsJsonObject(); - Assert.assertEquals("method1", spanObject.get("event").getAsString()); - Assert.assertTrue(spanObject.get("duration_ms").getAsLong() >= 500); - } - - @Test - public void testUnsignedLong() { - Assert.assertEquals("12345", SlowLogImpl.toUnsignedBigInteger(12345L).toString()); - Assert.assertEquals("18446744073709551615", SlowLogImpl.toUnsignedBigInteger(-1L).toString()); - Assert.assertEquals("18446744073709551614", SlowLogImpl.toUnsignedBigInteger(-2L).toString()); - } - - @Test - public void testWithFields() throws InterruptedException { - SlowLogImpl slowLog = new SlowLogImpl(1); - slowLog - .withField("key0", "value0") - .withField("key1", ImmutableList.of("value0", "value1")) - .withField("key2", ImmutableMap.of("key3", "value3")); - - JsonObject object = slowLog.getSlowLogJson(); - Assert.assertEquals("value0", object.get("key0").getAsString()); - - AtomicInteger i = new AtomicInteger(); - object - .get("key1") - .getAsJsonArray() - .forEach(e -> Assert.assertEquals("value" + (i.getAndIncrement()), e.getAsString())); - - Assert.assertEquals("value3", object.get("key2").getAsJsonObject().get("key3").getAsString()); - } -}