Skip to content
5 changes: 0 additions & 5 deletions src/main/java/org/tikv/common/AbstractGRPCClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,6 @@ public <ReqT, RespT> RespT callWithRetry(
stub.getChannel(), method, stub.getCallOptions(), requestFactory.get());
},
method.getFullMethodName());
if (resp != null && this.conf.getEnableGrpcForward()) {
tryUpdateProxy();
}

if (logger.isTraceEnabled()) {
logger.trace(String.format("leaving %s...", method.getFullMethodName()));
Expand Down Expand Up @@ -180,8 +177,6 @@ public long getTimeout() {

protected abstract StubT getAsyncStub();

protected abstract void tryUpdateProxy();

protected boolean checkHealth(String addressStr, HostMapping hostMapping) {
ManagedChannel channel = channelFactory.getChannel(addressStr, hostMapping);
HealthGrpc.HealthBlockingStub stub =
Expand Down
3 changes: 0 additions & 3 deletions src/main/java/org/tikv/common/PDClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -557,9 +557,6 @@ protected PDStub getAsyncStub() {
return pdClientWrapper.getAsyncStub().withDeadlineAfter(getTimeout(), TimeUnit.MILLISECONDS);
}

@Override
protected void tryUpdateProxy() {}

private void initCluster() {
GetMembersResponse resp = null;
List<URI> pdAddrs = getConf().getPdAddrs();
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/org/tikv/common/operation/KVErrorHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ public boolean handleResponseError(BackOffer backOffer, RespT resp) {
Errorpb.Error error = regionHandler.getRegionError(resp);
if (error != null) {
return regionHandler.handleRegionError(backOffer, error);
} else {
regionHandler.tryUpdateRegionStore();
}

// Key error handling logic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,16 @@ public boolean handleResponseError(BackOffer backOffer, RespT resp) {
Errorpb.Error error = getRegionError(resp);
if (error != null) {
return handleRegionError(backOffer, error);
} else {
tryUpdateRegionStore();
}
return false;
}

public void tryUpdateRegionStore() {
recv.tryUpdateRegionStore();
}

public boolean handleRegionError(BackOffer backOffer, Errorpb.Error error) {
if (error.hasNotLeader()) {
// this error is reported from raftstore:
Expand Down
227 changes: 155 additions & 72 deletions src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,17 @@

import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.health.v1.HealthCheckRequest;
import io.grpc.health.v1.HealthCheckResponse;
import io.grpc.health.v1.HealthGrpc;
import io.grpc.stub.MetadataUtils;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.AbstractGRPCClient;
import org.tikv.common.TiConfiguration;
import org.tikv.common.exception.GrpcException;
import org.tikv.common.util.ChannelFactory;
import org.tikv.kvproto.Kvrpcpb;
import org.tikv.kvproto.Metapb;
import org.tikv.kvproto.TikvGrpc;

Expand All @@ -46,7 +45,9 @@ public abstract class AbstractRegionStoreClient
protected TiRegion region;
protected TiStore targetStore;
protected TiStore originStore;
protected long retryTimes;
private long retryForwardTimes;
private long retryLeaderTimes;
private Metapb.Peer candidateLeader;

protected AbstractRegionStoreClient(
TiConfiguration conf,
Expand All @@ -64,12 +65,17 @@ protected AbstractRegionStoreClient(
this.regionManager = regionManager;
this.targetStore = store;
this.originStore = null;
this.retryTimes = 0;
this.candidateLeader = null;
this.retryForwardTimes = 0;
this.retryLeaderTimes = 0;
if (this.targetStore.getProxyStore() != null) {
this.timeout = conf.getForwardTimeout();
} else if (!this.targetStore.isReachable() && !this.targetStore.canForwardFirst()) {
onStoreUnreachable();
}
}

@Override
public TiRegion getRegion() {
return region;
}
Expand Down Expand Up @@ -103,44 +109,155 @@ public boolean onNotLeader(TiRegion newRegion) {
if (!region.getRegionEpoch().equals(newRegion.getRegionEpoch())) {
return false;
}

// If we try one peer but find the leader has not changed, we do not need try other peers.
if (candidateLeader != null
&& region.getLeader().getStoreId() == newRegion.getLeader().getStoreId()) {
retryLeaderTimes = newRegion.getFollowerList().size();
originStore = null;
}
candidateLeader = null;
region = newRegion;
targetStore = regionManager.getStoreById(region.getLeader().getStoreId());
originStore = null;
String addressStr = targetStore.getStore().getAddress();
ManagedChannel channel =
channelFactory.getChannel(addressStr, regionManager.getPDClient().getHostMapping());
blockingStub = TikvGrpc.newBlockingStub(channel);
asyncStub = TikvGrpc.newStub(channel);
updateClientStub();
return true;
}

@Override
public boolean onStoreUnreachable() {
if (!conf.getEnableGrpcForward()) {
regionManager.onRequestFail(region);
if (targetStore.getProxyStore() == null) {
if (targetStore.isReachable()) {
return true;
}
}

// If this store has failed to forward request too many times, we shall try other peer at first
// so that we can
// reduce the latency cost by fail requests.
if (targetStore.canForwardFirst()) {
if (conf.getEnableGrpcForward() && retryForwardTimes <= region.getFollowerList().size()) {
return retryOtherStoreByProxyForward();
}
if (retryOtherStoreLeader()) {
return true;
}
} else {
if (retryOtherStoreLeader()) {
return true;
}
if (conf.getEnableGrpcForward() && retryForwardTimes <= region.getFollowerList().size()) {
return retryOtherStoreByProxyForward();
}
return true;
}

logger.warn(
String.format(
"retry time exceed for region[%d], invalid this region[%d]",
region.getId(), targetStore.getId()));
regionManager.onRequestFail(region);
return false;
}

protected Kvrpcpb.Context makeContext(TiStoreType storeType) {
if (candidateLeader != null && storeType == TiStoreType.TiKV) {
return region.getReplicaContext(candidateLeader, java.util.Collections.emptySet());
} else {
return region.getReplicaContext(java.util.Collections.emptySet(), storeType);
}
}

protected Kvrpcpb.Context makeContext(Set<Long> resolvedLocks, TiStoreType storeType) {
if (candidateLeader != null && storeType == TiStoreType.TiKV) {
return region.getReplicaContext(candidateLeader, resolvedLocks);
} else {
return region.getReplicaContext(resolvedLocks, storeType);
}
}

@Override
public void tryUpdateRegionStore() {
if (originStore != null) {
if (originStore.getId() == targetStore.getId()) {
logger.warn(
String.format(
"update store [%s] by proxy-store [%s]",
targetStore.getStore().getAddress(), targetStore.getProxyStore().getAddress()));
// We do not need to mark the store can-forward, because if one store has grpc forward
// successfully, it will
// create a new store object, which is can-forward.
regionManager.updateStore(originStore, targetStore);
} else {
// If we try to forward request to leader by follower failed, it means that the store of old
// leader may be
// unavailable but the new leader has not been report to PD. So we can ban this store for a
// short time to
// avoid too many request try forward rather than try other peer.
originStore.forwardFail();
}
}
if (candidateLeader != null) {
logger.warn(
String.format(
"update leader to store [%d] for region[%d]",
candidateLeader.getStoreId(), region.getId()));
this.regionManager.updateLeader(region, candidateLeader.getStoreId());
}
}

private boolean retryOtherStoreLeader() {
List<Metapb.Peer> peers = region.getFollowerList();
if (retryLeaderTimes >= peers.size()) {
return false;
}
if (targetStore.getProxyStore() == null) {
if (!targetStore.isUnreachable()) {
if (checkHealth(targetStore.getStore())) {
retryLeaderTimes += 1;
boolean hasVisitedStore = false;
for (Metapb.Peer cur : peers) {
if (candidateLeader == null || hasVisitedStore) {
TiStore store = regionManager.getStoreById(cur.getStoreId());
if (store != null && store.isReachable()) {
targetStore = store;
candidateLeader = cur;
logger.warn(
String.format(
"try store [%d],peer[%d] for region[%d], which may be new leader",
targetStore.getId(), candidateLeader.getId(), region.getId()));
updateClientStub();
return true;
} else {
continue;
}
} else if (candidateLeader.getId() == cur.getId()) {
hasVisitedStore = true;
}
} else if (retryTimes > region.getFollowerList().size()) {
}
candidateLeader = null;
retryLeaderTimes = peers.size();
return false;
}

private void updateClientStub() {
String addressStr = targetStore.getStore().getAddress();
ManagedChannel channel =
channelFactory.getChannel(addressStr, regionManager.getPDClient().getHostMapping());
blockingStub = TikvGrpc.newBlockingStub(channel);
asyncStub = TikvGrpc.newStub(channel);
}

private boolean retryOtherStoreByProxyForward() {
if (!targetStore.isValid()) {
targetStore = regionManager.getStoreById(targetStore.getId());
logger.warn(
String.format(
"retry time exceed for region[%d], invalid this region[%d]",
region.getId(), targetStore.getId()));
regionManager.onRequestFail(region);
return false;
String.format("store [%d] has been invalid", region.getId(), targetStore.getId()));
return true;
}

TiStore proxyStore = switchProxyStore();
if (proxyStore == null) {
logger.warn(
String.format(
"no forward store can be selected for store [%s] and region[%d]",
targetStore.getStore().getAddress(), region.getId()));
regionManager.onRequestFail(region);
return false;
}
if (originStore == null) {
Expand All @@ -150,7 +267,7 @@ public boolean onStoreUnreachable() {
}
}
targetStore = proxyStore;
retryTimes += 1;
retryForwardTimes += 1;
logger.warn(
String.format(
"forward request to store [%s] by store [%s] for region[%d]",
Expand All @@ -167,58 +284,24 @@ public boolean onStoreUnreachable() {
return true;
}

@Override
protected void tryUpdateProxy() {
if (originStore != null) {
logger.warn(
String.format(
"update store [%s] by proxy-store [%s]",
targetStore.getStore().getAddress(), targetStore.getProxyStore().getAddress()));
regionManager.updateStore(originStore, targetStore);
}
}

private boolean checkHealth(Metapb.Store store) {
String addressStr = store.getAddress();
ManagedChannel channel =
channelFactory.getChannel(addressStr, regionManager.getPDClient().getHostMapping());
HealthGrpc.HealthBlockingStub stub =
HealthGrpc.newBlockingStub(channel)
.withDeadlineAfter(conf.getGrpcHealthCheckTimeout(), TimeUnit.MILLISECONDS);
HealthCheckRequest req = HealthCheckRequest.newBuilder().build();
try {
HealthCheckResponse resp = stub.check(req);
if (resp.getStatus() != HealthCheckResponse.ServingStatus.SERVING) {
return false;
}
} catch (Exception e) {
return false;
}
return true;
}

private TiStore switchProxyStore() {
boolean hasVisitedStore = false;
List<Metapb.Peer> peers = region.getFollowerList();
for (int i = 0; i < peers.size() * 2; i++) {
int idx = i % peers.size();
Metapb.Peer peer = peers.get(idx);
if (peer.getStoreId() != region.getLeader().getStoreId()) {
if (targetStore.getProxyStore() == null) {
TiStore store = regionManager.getStoreById(peer.getStoreId());
if (checkHealth(store.getStore())) {
return targetStore.withProxy(store.getStore());
}
} else {
if (peer.getStoreId() == targetStore.getProxyStore().getId()) {
hasVisitedStore = true;
} else if (hasVisitedStore) {
TiStore proxyStore = regionManager.getStoreById(peer.getStoreId());
if (!proxyStore.isUnreachable() && checkHealth(proxyStore.getStore())) {
return targetStore.withProxy(proxyStore.getStore());
}
}
if (peers.isEmpty()) {
return null;
}
Metapb.Store proxyStore = targetStore.getProxyStore();
if (proxyStore == null || peers.get(peers.size() - 1).getStoreId() == proxyStore.getId()) {
hasVisitedStore = true;
}
for (Metapb.Peer peer : peers) {
if (hasVisitedStore) {
TiStore store = regionManager.getStoreById(peer.getStoreId());
if (store.isReachable()) {
return targetStore.withProxy(store.getStore());
}
} else if (peer.getStoreId() == proxyStore.getId()) {
hasVisitedStore = true;
}
}
return null;
Expand Down
Loading