Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 67 additions & 36 deletions metrics/grafana/client_java_summary.json

Large diffs are not rendered by default.

90 changes: 42 additions & 48 deletions src/main/java/org/tikv/common/AbstractGRPCClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,18 +82,16 @@ public <ReqT, RespT> RespT callWithRetry(
if (logger.isTraceEnabled()) {
logger.trace(String.format("Calling %s...", method.getFullMethodName()));
}
RetryPolicy.Builder<RespT> builder = new Builder<>(backOffer);
RetryPolicy<RespT> policy = new Builder<RespT>(backOffer).create(handler);
RespT resp =
builder
.create(handler)
.callWithRetry(
() -> {
BlockingStubT stub = getBlockingStub();
return ClientCalls.blockingUnaryCall(
stub.getChannel(), method, stub.getCallOptions(), requestFactory.get());
},
method.getFullMethodName(),
backOffer);
policy.callWithRetry(
() -> {
BlockingStubT stub = getBlockingStub();
return ClientCalls.blockingUnaryCall(
stub.getChannel(), method, stub.getCallOptions(), requestFactory.get());
},
method.getFullMethodName(),
backOffer);

if (logger.isTraceEnabled()) {
logger.trace(String.format("leaving %s...", method.getFullMethodName()));
Expand All @@ -109,20 +107,18 @@ protected <ReqT, RespT> void callAsyncWithRetry(
ErrorHandler<RespT> handler) {
logger.debug(String.format("Calling %s...", method.getFullMethodName()));

RetryPolicy.Builder<RespT> builder = new Builder<>(backOffer);
builder
.create(handler)
.callWithRetry(
() -> {
FutureStubT stub = getAsyncStub();
ClientCalls.asyncUnaryCall(
stub.getChannel().newCall(method, stub.getCallOptions()),
requestFactory.get(),
responseObserver);
return null;
},
method.getFullMethodName(),
backOffer);
RetryPolicy<RespT> policy = new Builder<RespT>(backOffer).create(handler);
policy.callWithRetry(
() -> {
FutureStubT stub = getAsyncStub();
ClientCalls.asyncUnaryCall(
stub.getChannel().newCall(method, stub.getCallOptions()),
requestFactory.get(),
responseObserver);
return null;
},
method.getFullMethodName(),
backOffer);
logger.debug(String.format("leaving %s...", method.getFullMethodName()));
}

Expand All @@ -133,18 +129,17 @@ <ReqT, RespT> StreamObserver<ReqT> callBidiStreamingWithRetry(
ErrorHandler<StreamObserver<ReqT>> handler) {
logger.debug(String.format("Calling %s...", method.getFullMethodName()));

RetryPolicy.Builder<StreamObserver<ReqT>> builder = new Builder<>(backOffer);
RetryPolicy<StreamObserver<ReqT>> policy =
new Builder<StreamObserver<ReqT>>(backOffer).create(handler);
StreamObserver<ReqT> observer =
builder
.create(handler)
.callWithRetry(
() -> {
FutureStubT stub = getAsyncStub();
return asyncBidiStreamingCall(
stub.getChannel().newCall(method, stub.getCallOptions()), responseObserver);
},
method.getFullMethodName(),
backOffer);
policy.callWithRetry(
() -> {
FutureStubT stub = getAsyncStub();
return asyncBidiStreamingCall(
stub.getChannel().newCall(method, stub.getCallOptions()), responseObserver);
},
method.getFullMethodName(),
backOffer);
logger.debug(String.format("leaving %s...", method.getFullMethodName()));
return observer;
}
Expand All @@ -156,19 +151,18 @@ public <ReqT, RespT> StreamingResponse callServerStreamingWithRetry(
ErrorHandler<StreamingResponse> handler) {
logger.debug(String.format("Calling %s...", method.getFullMethodName()));

RetryPolicy.Builder<StreamingResponse> builder = new Builder<>(backOffer);
RetryPolicy<StreamingResponse> policy =
new Builder<StreamingResponse>(backOffer).create(handler);
StreamingResponse response =
builder
.create(handler)
.callWithRetry(
() -> {
BlockingStubT stub = getBlockingStub();
return new StreamingResponse(
blockingServerStreamingCall(
stub.getChannel(), method, stub.getCallOptions(), requestFactory.get()));
},
method.getFullMethodName(),
backOffer);
policy.callWithRetry(
() -> {
BlockingStubT stub = getBlockingStub();
return new StreamingResponse(
blockingServerStreamingCall(
stub.getChannel(), method, stub.getCallOptions(), requestFactory.get()));
},
method.getFullMethodName(),
backOffer);
logger.debug(String.format("leaving %s...", method.getFullMethodName()));
return response;
}
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/org/tikv/common/KVClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ public void close() {}
* @return a ByteString value if key exists, ByteString.EMPTY if key does not exist
*/
public ByteString get(ByteString key, long version) throws GrpcException {
BackOffer backOffer = ConcreteBackOffer.newGetBackOff();
BackOffer backOffer =
ConcreteBackOffer.newGetBackOff(
clientBuilder.getRegionManager().getPDClient().getClusterId());
while (true) {
RegionStoreClient client = clientBuilder.build(key);
try {
Expand Down
8 changes: 5 additions & 3 deletions src/main/java/org/tikv/common/PDClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDFutureStub>
HistogramUtils.buildDuration()
.name("client_java_pd_get_region_by_requests_latency")
.help("pd getRegionByKey request latency.")
.labelNames("cluster")
.register();

private PDClient(TiConfiguration conf, ChannelFactory channelFactory) {
Expand Down Expand Up @@ -281,7 +282,7 @@ private GetOperatorResponse getOperator(long regionId) {
() -> GetOperatorRequest.newBuilder().setHeader(header).setRegionId(regionId).build();
// get operator no need to handle error and no need back offer.
return callWithRetry(
ConcreteBackOffer.newCustomBackOff(0),
ConcreteBackOffer.newCustomBackOff(0, getClusterId()),
PDGrpc.getGetOperatorMethod(),
request,
new NoopHandler<>());
Expand Down Expand Up @@ -309,7 +310,8 @@ private boolean isScatterRegionFinish(GetOperatorResponse resp) {

@Override
public Pair<Metapb.Region, Metapb.Peer> getRegionByKey(BackOffer backOffer, ByteString key) {
Histogram.Timer requestTimer = PD_GET_REGION_BY_KEY_REQUEST_LATENCY.startTimer();
Histogram.Timer requestTimer =
PD_GET_REGION_BY_KEY_REQUEST_LATENCY.labels(getClusterId().toString()).startTimer();
try {
if (conf.isTxnKVMode()) {
CodecDataOutput cdo = new CodecDataOutput();
Expand Down Expand Up @@ -841,7 +843,7 @@ private Metapb.Region decodeRegion(Metapb.Region region) {
return builder.build();
}

public long getClusterId() {
public Long getClusterId() {
return header.getClusterId();
}

Expand Down
2 changes: 2 additions & 0 deletions src/main/java/org/tikv/common/ReadOnlyPDClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,6 @@ List<Pdpb.Region> scanRegions(
List<Store> getAllStores(BackOffer backOffer);

TiConfiguration.ReplicaRead getReplicaRead();

Long getClusterId();
}
4 changes: 3 additions & 1 deletion src/main/java/org/tikv/common/Snapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ public List<org.tikv.common.BytePairWrapper> batchGet(int backOffer, List<byte[]
try (KVClient client = new KVClient(session, session.getRegionStoreClientBuilder())) {
List<KvPair> kvPairList =
client.batchGet(
ConcreteBackOffer.newCustomBackOff(backOffer), list, timestamp.getVersion());
ConcreteBackOffer.newCustomBackOff(backOffer, session.getPDClient().getClusterId()),
list,
timestamp.getVersion());
return kvPairList
.stream()
.map(
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/org/tikv/common/StoreVersion.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ public static int compareTo(String v0, String v1) {
public static boolean minTiKVVersion(String version, PDClient pdClient) {
StoreVersion storeVersion = new StoreVersion(version);

BackOffer bo = ConcreteBackOffer.newCustomBackOff(BackOffer.PD_INFO_BACKOFF);
BackOffer bo =
ConcreteBackOffer.newCustomBackOff(BackOffer.PD_INFO_BACKOFF, pdClient.getClusterId());
List<Metapb.Store> storeList =
pdClient
.getAllStores(bo)
Expand Down
18 changes: 12 additions & 6 deletions src/main/java/org/tikv/common/TiSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public TiSession(TiConfiguration conf) {
if (conf.isWarmUpEnable() && conf.isRawKVMode()) {
warmUp();
}
this.circuitBreaker = new CircuitBreakerImpl(conf);
this.circuitBreaker = new CircuitBreakerImpl(conf, client.getClusterId());
logger.info("TiSession initialized in " + conf.getKvMode() + " mode");
}

Expand All @@ -167,7 +167,7 @@ private static VersionInfo getVersionInfo() {

private synchronized void warmUp() {
long warmUpStartTime = System.nanoTime();
BackOffer backOffer = ConcreteBackOffer.newRawKVBackOff();
BackOffer backOffer = ConcreteBackOffer.newRawKVBackOff(getPDClient().getClusterId());
try {
// let JVM ClassLoader load gRPC error related classes
// this operation may cost 100ms
Expand Down Expand Up @@ -317,7 +317,8 @@ public TiConfiguration getConf() {
public TiTimestamp getTimestamp() {
checkIsClosed();

return getPDClient().getTimestamp(ConcreteBackOffer.newTsoBackOff());
return getPDClient()
.getTimestamp(ConcreteBackOffer.newTsoBackOff(getPDClient().getClusterId()));
}

public Snapshot createSnapshot() {
Expand Down Expand Up @@ -574,13 +575,16 @@ public void splitRegionAndScatter(
.stream()
.map(k -> Key.toRawKey(k).toByteString())
.collect(Collectors.toList()),
ConcreteBackOffer.newCustomBackOff(splitRegionBackoffMS));
ConcreteBackOffer.newCustomBackOff(splitRegionBackoffMS, getPDClient().getClusterId()));

// scatter region
for (Metapb.Region newRegion : newRegions) {
try {
getPDClient()
.scatterRegion(newRegion, ConcreteBackOffer.newCustomBackOff(scatterRegionBackoffMS));
.scatterRegion(
newRegion,
ConcreteBackOffer.newCustomBackOff(
scatterRegionBackoffMS, getPDClient().getClusterId()));
} catch (Exception e) {
logger.warn(String.format("failed to scatter region: %d", newRegion.getId()), e);
}
Expand All @@ -597,7 +601,9 @@ public void splitRegionAndScatter(
return;
}
getPDClient()
.waitScatterRegionFinish(newRegion, ConcreteBackOffer.newCustomBackOff((int) remainMS));
.waitScatterRegionFinish(
newRegion,
ConcreteBackOffer.newCustomBackOff((int) remainMS, getPDClient().getClusterId()));
}
} else {
logger.info("skip to wait scatter region finish");
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/org/tikv/common/importer/ImporterClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,9 @@ private void ingest() throws GrpcException {
}

Object writeResponse = clientLeader.getWriteResponse();
BackOffer backOffer = ConcreteBackOffer.newCustomBackOff(BackOffer.INGEST_BACKOFF);
BackOffer backOffer =
ConcreteBackOffer.newCustomBackOff(
BackOffer.INGEST_BACKOFF, tiSession.getPDClient().getClusterId());
ingestWithRetry(writeResponse, backOffer);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ private void switchTiKVToImportMode() {
}

private void doSwitchTiKVMode(ImportSstpb.SwitchMode mode) {
BackOffer bo = ConcreteBackOffer.newCustomBackOff(BackOffer.PD_INFO_BACKOFF);
BackOffer bo =
ConcreteBackOffer.newCustomBackOff(BackOffer.PD_INFO_BACKOFF, pdClient.getClusterId());
List<Metapb.Store> allStores = pdClient.getAllStores(bo);
for (Metapb.Store store : allStores) {
ImporterStoreClient client = builder.build(new TiStore(store));
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/org/tikv/common/log/SlowLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,7 @@ default SlowLog withField(String key, Object value) {
return withFields(ImmutableMap.of(key, value));
}

Object getField(String key);

void log();
}
5 changes: 5 additions & 0 deletions src/main/java/org/tikv/common/log/SlowLogEmptyImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ public SlowLog withFields(Map<String, Object> fields) {
return this;
}

@Override
public Object getField(String key) {
return null;
}

@Override
public void log() {}
}
5 changes: 5 additions & 0 deletions src/main/java/org/tikv/common/log/SlowLogImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ public SlowLog withFields(Map<String, Object> fields) {
return this;
}

@Override
public Object getField(String key) {
return fields.get(key);
}

@Override
public void log() {
recordTime();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ private ByteString resolveCurrentLock(Kvrpcpb.KvPair current) {
builder.getRegionManager().getRegionStorePairByKey(current.getKey());
TiRegion region = pair.first;
TiStore store = pair.second;
BackOffer backOffer = ConcreteBackOffer.newGetBackOff();
BackOffer backOffer =
ConcreteBackOffer.newGetBackOff(builder.getRegionManager().getPDClient().getClusterId());
try (RegionStoreClient client = builder.build(region, store)) {
return client.get(backOffer, current.getKey(), version);
} catch (Exception e) {
Expand Down
16 changes: 8 additions & 8 deletions src/main/java/org/tikv/common/policy/RetryPolicy.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,19 @@ public abstract class RetryPolicy<RespT> {
HistogramUtils.buildDuration()
.name("client_java_grpc_single_requests_latency")
.help("grpc request latency.")
.labelNames("type")
.labelNames("type", "cluster")
.register();
public static final Histogram CALL_WITH_RETRY_DURATION =
HistogramUtils.buildDuration()
.name("client_java_call_with_retry_duration")
.help("callWithRetry duration.")
.labelNames("type")
.labelNames("type", "cluster")
.register();
public static final Counter GRPC_REQUEST_RETRY_NUM =
Counter.build()
.name("client_java_grpc_requests_retry_num")
.help("grpc request retry num.")
.labelNames("type")
.labelNames("type", "cluster")
.register();

// handles PD and TiKV's error.
Expand All @@ -72,16 +72,16 @@ private void rethrowNotRecoverableException(Exception e) {
}

public RespT callWithRetry(Callable<RespT> proc, String methodName, BackOffer backOffer) {
Histogram.Timer callWithRetryTimer = CALL_WITH_RETRY_DURATION.labels(methodName).startTimer();
String[] labels = new String[] {methodName, backOffer.getClusterId().toString()};
Histogram.Timer callWithRetryTimer = CALL_WITH_RETRY_DURATION.labels(labels).startTimer();
SlowLogSpan callWithRetrySlowLogSpan = backOffer.getSlowLog().start("callWithRetry");
callWithRetrySlowLogSpan.addProperty("method", methodName);
try {
while (true) {
RespT result = null;
try {
// add single request duration histogram
Histogram.Timer requestTimer =
GRPC_SINGLE_REQUEST_LATENCY.labels(methodName).startTimer();
Histogram.Timer requestTimer = GRPC_SINGLE_REQUEST_LATENCY.labels(labels).startTimer();
SlowLogSpan slowLogSpan = backOffer.getSlowLog().start("gRPC");
slowLogSpan.addProperty("method", methodName);
try {
Expand All @@ -96,7 +96,7 @@ public RespT callWithRetry(Callable<RespT> proc, String methodName, BackOffer ba
backOffer.checkTimeout();
boolean retry = handler.handleRequestError(backOffer, e);
if (retry) {
GRPC_REQUEST_RETRY_NUM.labels(methodName).inc();
GRPC_REQUEST_RETRY_NUM.labels(labels).inc();
continue;
} else {
return result;
Expand All @@ -107,7 +107,7 @@ public RespT callWithRetry(Callable<RespT> proc, String methodName, BackOffer ba
if (handler != null) {
boolean retry = handler.handleResponseError(backOffer, result);
if (retry) {
GRPC_REQUEST_RETRY_NUM.labels(methodName).inc();
GRPC_REQUEST_RETRY_NUM.labels(labels).inc();
continue;
}
}
Expand Down
Loading