Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/main/java/org/tikv/common/PDClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -840,4 +840,12 @@ private Metapb.Region decodeRegion(Metapb.Region region) {

return builder.build();
}

public long getClusterId() {
return header.getClusterId();
}

public List<URI> getPdAddrs() {
return pdAddrs;
}
}
10 changes: 10 additions & 0 deletions src/main/java/org/tikv/common/log/SlowLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@

package org.tikv.common.log;

import com.google.common.collect.ImmutableMap;
import java.util.Map;

public interface SlowLog {

SlowLogSpan start(String name);

long getTraceId();
Expand All @@ -26,5 +30,11 @@ public interface SlowLog {

void setError(Throwable err);

SlowLog withFields(Map<String, Object> fields);

default SlowLog withField(String key, Object value) {
return withFields(ImmutableMap.of(key, value));
}

void log();
}
7 changes: 7 additions & 0 deletions src/main/java/org/tikv/common/log/SlowLogEmptyImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.tikv.common.log;

import java.util.Map;

public class SlowLogEmptyImpl implements SlowLog {
public static final SlowLogEmptyImpl INSTANCE = new SlowLogEmptyImpl();

Expand All @@ -40,6 +42,11 @@ public long getThresholdMS() {
@Override
public void setError(Throwable err) {}

@Override
public SlowLog withFields(Map<String, Object> fields) {
return this;
}

@Override
public void log() {}
}
30 changes: 30 additions & 0 deletions src/main/java/org/tikv/common/log/SlowLogImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,24 @@
import java.math.BigInteger;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SlowLogImpl implements SlowLog {

private static final Logger logger = LoggerFactory.getLogger(SlowLogImpl.class);

private static final int MAX_SPAN_SIZE = 1024;

private static final Random random = new Random();

private final List<SlowLogSpan> slowLogSpans = new ArrayList<>();
private final HashMap<String, Object> fields = new HashMap<>();
private Throwable error = null;

private final long startMS;
Expand Down Expand Up @@ -81,6 +86,12 @@ public void setError(Throwable err) {
this.error = err;
}

@Override
public SlowLog withFields(Map<String, Object> fields) {
this.fields.putAll(fields);
return this;
}

@Override
public void log() {
recordTime();
Expand Down Expand Up @@ -120,6 +131,25 @@ JsonObject getSlowLogJson() {
}
jsonObject.add("spans", jsonArray);

for (Entry<String, Object> entry : fields.entrySet()) {
Object value = entry.getValue();
if (value instanceof List) {
JsonArray field = new JsonArray();
for (Object o : (List<?>) value) {
field.add(o.toString());
}
jsonObject.add(entry.getKey(), field);
} else if (value instanceof Map) {
JsonObject field = new JsonObject();
for (Entry<?, ?> e : ((Map<?, ?>) value).entrySet()) {
field.addProperty(e.getKey().toString(), e.getValue().toString());
}
jsonObject.add(entry.getKey(), field);
} else {
jsonObject.addProperty(entry.getKey(), value.toString());
}
}

return jsonObject;
}

Expand Down
61 changes: 47 additions & 14 deletions src/main/java/org/tikv/raw/RawKVClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,31 @@

package org.tikv.raw;

import static org.tikv.common.util.ClientUtils.*;
import static org.tikv.common.util.ClientUtils.appendBatches;
import static org.tikv.common.util.ClientUtils.genUUID;
import static org.tikv.common.util.ClientUtils.getBatches;
import static org.tikv.common.util.ClientUtils.getTasks;
import static org.tikv.common.util.ClientUtils.getTasksWithOutput;
import static org.tikv.common.util.ClientUtils.groupKeysByRegion;

import com.google.protobuf.ByteString;
import io.prometheus.client.Counter;
import io.prometheus.client.Histogram;
import java.util.*;
import java.util.concurrent.*;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -44,10 +62,19 @@
import org.tikv.common.region.RegionStoreClient;
import org.tikv.common.region.RegionStoreClient.RegionStoreClientBuilder;
import org.tikv.common.region.TiRegion;
import org.tikv.common.util.*;
import org.tikv.common.util.BackOffFunction;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.Batch;
import org.tikv.common.util.ConcreteBackOffer;
import org.tikv.common.util.DeleteRange;
import org.tikv.common.util.HistogramUtils;
import org.tikv.common.util.Pair;
import org.tikv.common.util.ScanOption;
import org.tikv.kvproto.Kvrpcpb.KvPair;

public class RawKVClient implements RawKVClientBase {
private final long clusterId;
private final List<URI> pdAddresses;
private final TiSession tiSession;
private final RegionStoreClientBuilder clientBuilder;
private final TiConfiguration conf;
Expand Down Expand Up @@ -95,6 +122,12 @@ public RawKVClient(TiSession session, RegionStoreClientBuilder clientBuilder) {
this.batchScanThreadPool = session.getThreadPoolForBatchScan();
this.deleteRangeThreadPool = session.getThreadPoolForDeleteRange();
this.atomicForCAS = conf.isEnableAtomicForCAS();
this.clusterId = session.getPDClient().getClusterId();
this.pdAddresses = session.getPDClient().getPdAddrs();
}

private SlowLog withClusterInfo(SlowLog logger) {
return logger.withField("cluster_id", clusterId).withField("pd_addresses", pdAddresses);
}

@Override
Expand All @@ -110,7 +143,7 @@ public void put(ByteString key, ByteString value, long ttl) {
String label = "client_raw_put";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();

SlowLog slowLog = new SlowLogImpl(conf.getRawKVWriteSlowLogInMS());
SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVWriteSlowLogInMS()));
SlowLogSpan span = slowLog.start("put");
span.addProperty("key", KeyUtils.formatBytesUTF8(key));

Expand Down Expand Up @@ -172,7 +205,7 @@ public void compareAndSet(
String label = "client_raw_compare_and_set";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();

SlowLog slowLog = new SlowLogImpl(conf.getRawKVWriteSlowLogInMS());
SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVWriteSlowLogInMS()));
SlowLogSpan span = slowLog.start("putIfAbsent");
span.addProperty("key", KeyUtils.formatBytesUTF8(key));

Expand Down Expand Up @@ -211,7 +244,7 @@ public void batchPut(Map<ByteString, ByteString> kvPairs, long ttl) {
String label = "client_raw_batch_put";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();

SlowLog slowLog = new SlowLogImpl(conf.getRawKVBatchWriteSlowLogInMS());
SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVBatchWriteSlowLogInMS()));
SlowLogSpan span = slowLog.start("batchPut");
span.addProperty("keySize", String.valueOf(kvPairs.size()));

Expand All @@ -237,7 +270,7 @@ public Optional<ByteString> get(ByteString key) {
String label = "client_raw_get";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();

SlowLog slowLog = new SlowLogImpl(conf.getRawKVReadSlowLogInMS());
SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVReadSlowLogInMS()));
SlowLogSpan span = slowLog.start("get");
span.addProperty("key", KeyUtils.formatBytesUTF8(key));

Expand Down Expand Up @@ -270,7 +303,7 @@ public Optional<ByteString> get(ByteString key) {
public List<KvPair> batchGet(List<ByteString> keys) {
String label = "client_raw_batch_get";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
SlowLog slowLog = new SlowLogImpl(conf.getRawKVBatchReadSlowLogInMS());
SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVBatchReadSlowLogInMS()));
SlowLogSpan span = slowLog.start("batchGet");
span.addProperty("keySize", String.valueOf(keys.size()));
ConcreteBackOffer backOffer =
Expand All @@ -295,7 +328,7 @@ public List<KvPair> batchGet(List<ByteString> keys) {
public void batchDelete(List<ByteString> keys) {
String label = "client_raw_batch_delete";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
SlowLog slowLog = new SlowLogImpl(conf.getRawKVBatchWriteSlowLogInMS());
SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVBatchWriteSlowLogInMS()));
SlowLogSpan span = slowLog.start("batchDelete");
span.addProperty("keySize", String.valueOf(keys.size()));
ConcreteBackOffer backOffer =
Expand All @@ -320,7 +353,7 @@ public void batchDelete(List<ByteString> keys) {
public Optional<Long> getKeyTTL(ByteString key) {
String label = "client_raw_get_key_ttl";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
SlowLog slowLog = new SlowLogImpl(conf.getRawKVReadSlowLogInMS());
SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVReadSlowLogInMS()));
SlowLogSpan span = slowLog.start("getKeyTTL");
span.addProperty("key", KeyUtils.formatBytesUTF8(key));
ConcreteBackOffer backOffer =
Expand Down Expand Up @@ -428,7 +461,7 @@ public List<KvPair> scan(ByteString startKey, ByteString endKey, int limit) {
public List<KvPair> scan(ByteString startKey, ByteString endKey, int limit, boolean keyOnly) {
String label = "client_raw_scan";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
SlowLog slowLog = new SlowLogImpl(conf.getRawKVScanSlowLogInMS());
SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVScanSlowLogInMS()));
SlowLogSpan span = slowLog.start("scan");
span.addProperty("startKey", KeyUtils.formatBytesUTF8(startKey));
span.addProperty("endKey", KeyUtils.formatBytesUTF8(endKey));
Expand Down Expand Up @@ -473,7 +506,7 @@ public List<KvPair> scan(ByteString startKey, ByteString endKey) {
public List<KvPair> scan(ByteString startKey, ByteString endKey, boolean keyOnly) {
String label = "client_raw_scan_without_limit";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
SlowLog slowLog = new SlowLogImpl(conf.getRawKVScanSlowLogInMS());
SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVScanSlowLogInMS()));
SlowLogSpan span = slowLog.start("scan");
span.addProperty("startKey", KeyUtils.formatBytesUTF8(startKey));
span.addProperty("endKey", KeyUtils.formatBytesUTF8(endKey));
Expand Down Expand Up @@ -539,7 +572,7 @@ public List<KvPair> scanPrefix(ByteString prefixKey, boolean keyOnly) {
public void delete(ByteString key) {
String label = "client_raw_delete";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
SlowLog slowLog = new SlowLogImpl(conf.getRawKVWriteSlowLogInMS());
SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVWriteSlowLogInMS()));
SlowLogSpan span = slowLog.start("delete");
span.addProperty("key", KeyUtils.formatBytesUTF8(key));
ConcreteBackOffer backOffer =
Expand Down
23 changes: 23 additions & 0 deletions src/test/java/org/tikv/common/log/SlowLogImplTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@

package org.tikv.common.log;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Assert;
import org.junit.Test;

Expand Down Expand Up @@ -63,4 +66,24 @@ public void testUnsignedLong() {
Assert.assertEquals("18446744073709551615", SlowLogImpl.toUnsignedBigInteger(-1L).toString());
Assert.assertEquals("18446744073709551614", SlowLogImpl.toUnsignedBigInteger(-2L).toString());
}

@Test
public void testWithFields() throws InterruptedException {
SlowLogImpl slowLog = new SlowLogImpl(1);
slowLog
.withField("key0", "value0")
.withField("key1", ImmutableList.of("value0", "value1"))
.withField("key2", ImmutableMap.of("key3", "value3"));

JsonObject object = slowLog.getSlowLogJson();
Assert.assertEquals("value0", object.get("key0").getAsString());

AtomicInteger i = new AtomicInteger();
object
.get("key1")
.getAsJsonArray()
.forEach(e -> Assert.assertEquals("value" + (i.getAndIncrement()), e.getAsString()));

Assert.assertEquals("value3", object.get("key2").getAsJsonObject().get("key3").getAsString());
}
}