Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
200 changes: 164 additions & 36 deletions metrics/grafana/client_java_summary.json

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion src/main/java/org/tikv/common/KVClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ public void close() {}
* @return a ByteString value if key exists, ByteString.EMPTY if key does not exist
*/
public ByteString get(ByteString key, long version) throws GrpcException {
BackOffer backOffer = ConcreteBackOffer.newGetBackOff();
BackOffer backOffer =
ConcreteBackOffer.newGetBackOff(
clientBuilder.getRegionManager().getPDClient().getClusterId());
while (true) {
RegionStoreClient client = clientBuilder.build(key);
try {
Expand Down
9 changes: 6 additions & 3 deletions src/main/java/org/tikv/common/PDClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDFutureStub>
HistogramUtils.buildDuration()
.name("client_java_pd_get_region_by_requests_latency")
.help("pd getRegionByKey request latency.")
.labelNames("cluster")
.register();

private PDClient(TiConfiguration conf, ChannelFactory channelFactory) {
Expand Down Expand Up @@ -203,7 +204,7 @@ private GetOperatorResponse getOperator(long regionId) {
() -> GetOperatorRequest.newBuilder().setHeader(header).setRegionId(regionId).build();
// get operator no need to handle error and no need back offer.
return callWithRetry(
ConcreteBackOffer.newCustomBackOff(0),
ConcreteBackOffer.newCustomBackOff(0, getClusterId()),
PDGrpc.getGetOperatorMethod(),
request,
new NoopHandler<>());
Expand Down Expand Up @@ -231,7 +232,8 @@ private boolean isScatterRegionFinish(GetOperatorResponse resp) {

@Override
public Pair<Metapb.Region, Metapb.Peer> getRegionByKey(BackOffer backOffer, ByteString key) {
Histogram.Timer requestTimer = PD_GET_REGION_BY_KEY_REQUEST_LATENCY.startTimer();
Histogram.Timer requestTimer =
PD_GET_REGION_BY_KEY_REQUEST_LATENCY.labels(getClusterId().toString()).startTimer();
try {
if (conf.getKvMode() == KVMode.TXN) {
CodecDataOutput cdo = new CodecDataOutput();
Expand Down Expand Up @@ -745,7 +747,8 @@ private Metapb.Region decodeRegion(Metapb.Region region) {
return builder.build();
}

public long getClusterId() {
@Override
public Long getClusterId() {
return header.getClusterId();
}

Expand Down
2 changes: 2 additions & 0 deletions src/main/java/org/tikv/common/ReadOnlyPDClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,6 @@ List<Pdpb.Region> scanRegions(
List<Store> getAllStores(BackOffer backOffer);

TiConfiguration.ReplicaRead getReplicaRead();

Long getClusterId();
}
4 changes: 3 additions & 1 deletion src/main/java/org/tikv/common/Snapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ public List<org.tikv.common.BytePairWrapper> batchGet(int backOffer, List<byte[]
try (KVClient client = new KVClient(session, session.getRegionStoreClientBuilder())) {
List<KvPair> kvPairList =
client.batchGet(
ConcreteBackOffer.newCustomBackOff(backOffer), list, timestamp.getVersion());
ConcreteBackOffer.newCustomBackOff(backOffer, session.getPDClient().getClusterId()),
list,
timestamp.getVersion());
return kvPairList
.stream()
.map(
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/org/tikv/common/StoreVersion.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ public static int compareTo(String v0, String v1) {
public static boolean minTiKVVersion(String version, PDClient pdClient) {
StoreVersion storeVersion = new StoreVersion(version);

BackOffer bo = ConcreteBackOffer.newCustomBackOff(BackOffer.PD_INFO_BACKOFF);
BackOffer bo =
ConcreteBackOffer.newCustomBackOff(BackOffer.PD_INFO_BACKOFF, pdClient.getClusterId());
List<Metapb.Store> storeList =
pdClient
.getAllStores(bo)
Expand Down
17 changes: 11 additions & 6 deletions src/main/java/org/tikv/common/TiSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,13 @@ public TiSession(TiConfiguration conf) {
if (conf.isWarmUpEnable() && conf.isRawKVMode()) {
warmUp();
}
this.circuitBreaker = new CircuitBreakerImpl(conf);
this.circuitBreaker = new CircuitBreakerImpl(conf, getPDClient().getClusterId());
logger.info("TiSession initialized in " + conf.getKvMode() + " mode");
}

private synchronized void warmUp() {
long warmUpStartTime = System.nanoTime();
BackOffer backOffer = ConcreteBackOffer.newRawKVBackOff();
BackOffer backOffer = ConcreteBackOffer.newRawKVBackOff(getPDClient().getClusterId());

try {
// let JVM ClassLoader load gRPC error related classes
Expand Down Expand Up @@ -128,7 +128,9 @@ private synchronized void warmUp() {
}
for (Pdpb.Region region : regions) {
regionManager.insertRegionToCache(
regionManager.createRegion(region.getRegion(), ConcreteBackOffer.newGetBackOff()));
regionManager.createRegion(
region.getRegion(),
ConcreteBackOffer.newGetBackOff(getPDClient().getClusterId())));
}
startKey = regions.get(regions.size() - 1).getRegion().getEndKey();
} while (!startKey.isEmpty());
Expand Down Expand Up @@ -226,7 +228,8 @@ public TiConfiguration getConf() {
public TiTimestamp getTimestamp() {
checkIsClosed();

return getPDClient().getTimestamp(ConcreteBackOffer.newTsoBackOff());
return getPDClient()
.getTimestamp(ConcreteBackOffer.newTsoBackOff(getPDClient().getClusterId()));
}

public Snapshot createSnapshot() {
Expand Down Expand Up @@ -459,7 +462,7 @@ public void splitRegionAndScatter(
.stream()
.map(k -> Key.toRawKey(k).next().toByteString())
.collect(Collectors.toList()),
ConcreteBackOffer.newCustomBackOff(splitRegionBackoffMS));
ConcreteBackOffer.newCustomBackOff(splitRegionBackoffMS, getPDClient().getClusterId()));

// scatter region
for (Metapb.Region newRegion : newRegions) {
Expand All @@ -482,7 +485,9 @@ public void splitRegionAndScatter(
return;
}
getPDClient()
.waitScatterRegionFinish(newRegion, ConcreteBackOffer.newCustomBackOff((int) remainMS));
.waitScatterRegionFinish(
newRegion,
ConcreteBackOffer.newCustomBackOff((int) remainMS, getPDClient().getClusterId()));
}
} else {
logger.info("skip to wait scatter region finish");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,9 @@ TiRegion loadCurrentRegionToCache() throws GrpcException {
try (RegionStoreClient client = builder.build(startKey)) {
client.setTimeout(conf.getScanTimeout());
region = client.getRegion();
BackOffer backOffer = ConcreteBackOffer.newScannerNextMaxBackOff();
BackOffer backOffer =
ConcreteBackOffer.newScannerNextMaxBackOff(
builder.getRegionManager().getPDClient().getClusterId());
currentCache = client.scan(backOffer, startKey, version);
return region;
}
Expand All @@ -86,7 +88,8 @@ private ByteString resolveCurrentLock(Kvrpcpb.KvPair current) {
builder.getRegionManager().getRegionStorePairByKey(current.getKey());
TiRegion region = pair.first;
TiStore store = pair.second;
BackOffer backOffer = ConcreteBackOffer.newGetBackOff();
BackOffer backOffer =
ConcreteBackOffer.newGetBackOff(builder.getRegionManager().getPDClient().getClusterId());
try (RegionStoreClient client = builder.build(region, store)) {
return client.get(backOffer, current.getKey(), version);
} catch (Exception e) {
Expand Down
16 changes: 8 additions & 8 deletions src/main/java/org/tikv/common/policy/RetryPolicy.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,19 @@ public abstract class RetryPolicy<RespT> {
HistogramUtils.buildDuration()
.name("client_java_grpc_single_requests_latency")
.help("grpc request latency.")
.labelNames("type")
.labelNames("type", "cluster")
.register();
public static final Histogram CALL_WITH_RETRY_DURATION =
HistogramUtils.buildDuration()
.name("client_java_call_with_retry_duration")
.help("callWithRetry duration.")
.labelNames("type")
.labelNames("type", "cluster")
.register();
public static final Counter GRPC_REQUEST_RETRY_NUM =
Counter.build()
.name("client_java_grpc_requests_retry_num")
.help("grpc request retry num.")
.labelNames("type")
.labelNames("type", "cluster")
.register();

// handles PD and TiKV's error.
Expand All @@ -70,16 +70,16 @@ private void rethrowNotRecoverableException(Exception e) {
}

public RespT callWithRetry(Callable<RespT> proc, String methodName, BackOffer backOffer) {
Histogram.Timer callWithRetryTimer = CALL_WITH_RETRY_DURATION.labels(methodName).startTimer();
String[] labels = new String[] {methodName, backOffer.getClusterId().toString()};
Histogram.Timer callWithRetryTimer = CALL_WITH_RETRY_DURATION.labels(labels).startTimer();
SlowLogSpan callWithRetrySlowLogSpan =
backOffer.getSlowLog().start("callWithRetry " + methodName);
try {
while (true) {
RespT result = null;
try {
// add single request duration histogram
Histogram.Timer requestTimer =
GRPC_SINGLE_REQUEST_LATENCY.labels(methodName).startTimer();
Histogram.Timer requestTimer = GRPC_SINGLE_REQUEST_LATENCY.labels(labels).startTimer();
SlowLogSpan slowLogSpan = backOffer.getSlowLog().start("gRPC " + methodName);
try {
result = proc.call();
Expand All @@ -93,7 +93,7 @@ public RespT callWithRetry(Callable<RespT> proc, String methodName, BackOffer ba
backOffer.checkTimeout();
boolean retry = handler.handleRequestError(backOffer, e);
if (retry) {
GRPC_REQUEST_RETRY_NUM.labels(methodName).inc();
GRPC_REQUEST_RETRY_NUM.labels(labels).inc();
continue;
} else {
return result;
Expand All @@ -104,7 +104,7 @@ public RespT callWithRetry(Callable<RespT> proc, String methodName, BackOffer ba
if (handler != null) {
boolean retry = handler.handleResponseError(backOffer, result);
if (retry) {
GRPC_REQUEST_RETRY_NUM.labels(methodName).inc();
GRPC_REQUEST_RETRY_NUM.labels(labels).inc();
continue;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,14 @@ public abstract class AbstractRegionStoreClient
HistogramUtils.buildDuration()
.name("client_java_seek_leader_store_duration")
.help("seek leader store duration.")
.labelNames("cluster")
.register();

public static final Histogram SEEK_PROXY_STORE_DURATION =
HistogramUtils.buildDuration()
.name("client_java_seek_proxy_store_duration")
.help("seek proxy store duration.")
.labelNames("cluster")
.register();

protected final RegionManager regionManager;
Expand Down Expand Up @@ -181,7 +183,10 @@ private void updateClientStub() {
}

private Boolean seekLeaderStore(BackOffer backOffer) {
Histogram.Timer switchLeaderDurationTimer = SEEK_LEADER_STORE_DURATION.startTimer();
Histogram.Timer switchLeaderDurationTimer =
SEEK_LEADER_STORE_DURATION
.labels(regionManager.getPDClient().getClusterId().toString())
.startTimer();
SlowLogSpan slowLogSpan = backOffer.getSlowLog().start("seekLeaderStore");
try {
List<Metapb.Peer> peers = region.getFollowerList();
Expand Down Expand Up @@ -230,7 +235,10 @@ private Boolean seekLeaderStore(BackOffer backOffer) {

private boolean seekProxyStore(BackOffer backOffer) {
SlowLogSpan slowLogSpan = backOffer.getSlowLog().start("seekProxyStore");
Histogram.Timer grpcForwardDurationTimer = SEEK_PROXY_STORE_DURATION.startTimer();
Histogram.Timer grpcForwardDurationTimer =
SEEK_PROXY_STORE_DURATION
.labels(regionManager.getPDClient().getClusterId().toString())
.startTimer();
try {
logger.info(String.format("try grpc forward: region[%d]", region.getId()));
// when current leader cannot be reached
Expand Down
13 changes: 10 additions & 3 deletions src/main/java/org/tikv/common/region/RegionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,13 @@ public class RegionManager {
HistogramUtils.buildDuration()
.name("client_java_get_region_by_requests_latency")
.help("getRegionByKey request latency.")
.labelNames("cluster")
.register();
public static final Histogram SCAN_REGIONS_REQUEST_LATENCY =
HistogramUtils.buildDuration()
.name("client_java_scan_regions_request_latency")
.help("scanRegions request latency.")
.labelNames("cluster")
.register();

// TODO: the region cache logic need rewrite.
Expand Down Expand Up @@ -101,7 +103,8 @@ public ReadOnlyPDClient getPDClient() {

public List<Pdpb.Region> scanRegions(
BackOffer backOffer, ByteString startKey, ByteString endKey, int limit) {
Histogram.Timer requestTimer = SCAN_REGIONS_REQUEST_LATENCY.startTimer();
Histogram.Timer requestTimer =
SCAN_REGIONS_REQUEST_LATENCY.labels(getPDClient().getClusterId().toString()).startTimer();
SlowLogSpan slowLogSpan = backOffer.getSlowLog().start("scanRegions");
try {
return pdClient.scanRegions(backOffer, startKey, endKey, limit);
Expand All @@ -118,7 +121,10 @@ public TiRegion getRegionByKey(ByteString key) {
}

public TiRegion getRegionByKey(ByteString key, BackOffer backOffer) {
Histogram.Timer requestTimer = GET_REGION_BY_KEY_REQUEST_LATENCY.startTimer();
Histogram.Timer requestTimer =
GET_REGION_BY_KEY_REQUEST_LATENCY
.labels(getPDClient().getClusterId().toString())
.startTimer();
SlowLogSpan slowLogSpan = backOffer.getSlowLog().start("getRegionByKey");
TiRegion region = cache.getRegionByKey(key, backOffer);
try {
Expand Down Expand Up @@ -311,6 +317,7 @@ public void insertRegionToCache(TiRegion region) {
}

private BackOffer defaultBackOff() {
return ConcreteBackOffer.newCustomBackOff(conf.getRawKVDefaultBackoffInMS());
return ConcreteBackOffer.newCustomBackOff(
conf.getRawKVDefaultBackoffInMS(), pdClient.getClusterId());
}
}
Loading