Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions src/main/java/org/tikv/common/AbstractGRPCClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,6 @@ public <ReqT, RespT> RespT callWithRetry(
stub.getChannel(), method, stub.getCallOptions(), requestFactory.get());
},
method.getFullMethodName());
if (resp != null && this.conf.getEnableGrpcForward()) {
tryUpdateProxy();
}

if (logger.isTraceEnabled()) {
logger.trace(String.format("leaving %s...", method.getFullMethodName()));
Expand Down Expand Up @@ -180,8 +177,6 @@ public long getTimeout() {

protected abstract StubT getAsyncStub();

protected abstract void tryUpdateProxy();

protected boolean checkHealth(String addressStr, HostMapping hostMapping) {
ManagedChannel channel = channelFactory.getChannel(addressStr, hostMapping);
HealthGrpc.HealthBlockingStub stub =
Expand Down
3 changes: 0 additions & 3 deletions src/main/java/org/tikv/common/PDClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -557,9 +557,6 @@ protected PDStub getAsyncStub() {
return pdClientWrapper.getAsyncStub().withDeadlineAfter(getTimeout(), TimeUnit.MILLISECONDS);
}

@Override
protected void tryUpdateProxy() {}

private void initCluster() {
GetMembersResponse resp = null;
List<URI> pdAddrs = getConf().getPdAddrs();
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/org/tikv/common/operation/KVErrorHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ public boolean handleResponseError(BackOffer backOffer, RespT resp) {
Errorpb.Error error = regionHandler.getRegionError(resp);
if (error != null) {
return regionHandler.handleRegionError(backOffer, error);
} else {
regionHandler.tryUpdateRegionStore();
}

// Key error handling logic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,16 @@ public boolean handleResponseError(BackOffer backOffer, RespT resp) {
Errorpb.Error error = getRegionError(resp);
if (error != null) {
return handleRegionError(backOffer, error);
} else {
tryUpdateRegionStore();
}
return false;
}

public void tryUpdateRegionStore() {
recv.tryUpdateRegionStore();
}

public boolean handleRegionError(BackOffer backOffer, Errorpb.Error error) {
if (error.hasNotLeader()) {
// this error is reported from raftstore:
Expand Down Expand Up @@ -168,6 +174,7 @@ public boolean handleRegionError(BackOffer backOffer, Errorpb.Error error) {
@Override
public boolean handleRequestError(BackOffer backOffer, Exception e) {
if (recv.onStoreUnreachable()) {
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoTiKVRPC, e);
return true;
}

Expand Down
227 changes: 155 additions & 72 deletions src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,17 @@

import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.health.v1.HealthCheckRequest;
import io.grpc.health.v1.HealthCheckResponse;
import io.grpc.health.v1.HealthGrpc;
import io.grpc.stub.MetadataUtils;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.AbstractGRPCClient;
import org.tikv.common.TiConfiguration;
import org.tikv.common.exception.GrpcException;
import org.tikv.common.util.ChannelFactory;
import org.tikv.kvproto.Kvrpcpb;
import org.tikv.kvproto.Metapb;
import org.tikv.kvproto.TikvGrpc;

Expand All @@ -46,7 +45,9 @@ public abstract class AbstractRegionStoreClient
protected TiRegion region;
protected TiStore targetStore;
protected TiStore originStore;
protected long retryTimes;
private long retryForwardTimes;
private long retryLeaderTimes;
private Metapb.Peer candidateLeader;

protected AbstractRegionStoreClient(
TiConfiguration conf,
Expand All @@ -64,12 +65,17 @@ protected AbstractRegionStoreClient(
this.regionManager = regionManager;
this.targetStore = store;
this.originStore = null;
this.retryTimes = 0;
this.candidateLeader = null;
this.retryForwardTimes = 0;
this.retryLeaderTimes = 0;
if (this.targetStore.getProxyStore() != null) {
this.timeout = conf.getForwardTimeout();
} else if (!this.targetStore.isReachable() && !this.targetStore.canForwardFirst()) {
onStoreUnreachable();
}
}

@Override
public TiRegion getRegion() {
return region;
}
Expand Down Expand Up @@ -103,44 +109,155 @@ public boolean onNotLeader(TiRegion newRegion) {
if (!region.getRegionEpoch().equals(newRegion.getRegionEpoch())) {
return false;
}

// If we try one peer but find the leader has not changed, we do not need try other peers.
if (candidateLeader != null
&& region.getLeader().getStoreId() == newRegion.getLeader().getStoreId()) {
retryLeaderTimes = newRegion.getFollowerList().size();
originStore = null;
}
candidateLeader = null;
region = newRegion;
targetStore = regionManager.getStoreById(region.getLeader().getStoreId());
originStore = null;
String addressStr = targetStore.getStore().getAddress();
ManagedChannel channel =
channelFactory.getChannel(addressStr, regionManager.getPDClient().getHostMapping());
blockingStub = TikvGrpc.newBlockingStub(channel);
asyncStub = TikvGrpc.newStub(channel);
updateClientStub();
return true;
}

@Override
public boolean onStoreUnreachable() {
if (!conf.getEnableGrpcForward()) {
regionManager.onRequestFail(region);
if (targetStore.getProxyStore() == null) {
if (targetStore.isReachable()) {
return true;
}
}

// If this store has failed to forward request too many times, we shall try other peer at first
// so that we can
// reduce the latency cost by fail requests.
if (targetStore.canForwardFirst()) {
if (conf.getEnableGrpcForward() && retryForwardTimes <= region.getFollowerList().size()) {
return retryOtherStoreByProxyForward();
}
if (retryOtherStoreLeader()) {
return true;
}
} else {
if (retryOtherStoreLeader()) {
return true;
}
if (conf.getEnableGrpcForward() && retryForwardTimes <= region.getFollowerList().size()) {
return retryOtherStoreByProxyForward();
}
return true;
}

logger.warn(
String.format(
"retry time exceed for region[%d], invalid this region[%d]",
region.getId(), targetStore.getId()));
regionManager.onRequestFail(region);
return false;
}

protected Kvrpcpb.Context makeContext(TiStoreType storeType) {
if (candidateLeader != null && storeType == TiStoreType.TiKV) {
return region.getReplicaContext(candidateLeader, java.util.Collections.emptySet());
} else {
return region.getReplicaContext(java.util.Collections.emptySet(), storeType);
}
}

protected Kvrpcpb.Context makeContext(Set<Long> resolvedLocks, TiStoreType storeType) {
if (candidateLeader != null && storeType == TiStoreType.TiKV) {
return region.getReplicaContext(candidateLeader, resolvedLocks);
} else {
return region.getReplicaContext(resolvedLocks, storeType);
}
}

@Override
public void tryUpdateRegionStore() {
if (originStore != null) {
if (originStore.getId() == targetStore.getId()) {
logger.warn(
String.format(
"update store [%s] by proxy-store [%s]",
targetStore.getStore().getAddress(), targetStore.getProxyStore().getAddress()));
// We do not need to mark the store can-forward, because if one store has grpc forward
// successfully, it will
// create a new store object, which is can-forward.
regionManager.updateStore(originStore, targetStore);
} else {
// If we try to forward request to leader by follower failed, it means that the store of old
// leader may be
// unavailable but the new leader has not been report to PD. So we can ban this store for a
// short time to
// avoid too many request try forward rather than try other peer.
originStore.forwardFail();
}
}
if (candidateLeader != null) {
logger.warn(
String.format(
"update leader to store [%d] for region[%d]",
candidateLeader.getStoreId(), region.getId()));
this.regionManager.updateLeader(region, candidateLeader.getStoreId());
}
}

private boolean retryOtherStoreLeader() {
List<Metapb.Peer> peers = region.getFollowerList();
if (retryLeaderTimes >= peers.size()) {
return false;
}
if (targetStore.getProxyStore() == null) {
if (!targetStore.isUnreachable()) {
if (checkHealth(targetStore.getStore())) {
retryLeaderTimes += 1;
boolean hasVisitedStore = false;
for (Metapb.Peer cur : peers) {
if (candidateLeader == null || hasVisitedStore) {
TiStore store = regionManager.getStoreById(cur.getStoreId());
if (store != null && store.isReachable()) {
targetStore = store;
candidateLeader = cur;
logger.warn(
String.format(
"try store [%d],peer[%d] for region[%d], which may be new leader",
targetStore.getId(), candidateLeader.getId(), region.getId()));
updateClientStub();
return true;
} else {
continue;
}
} else if (candidateLeader.getId() == cur.getId()) {
hasVisitedStore = true;
}
} else if (retryTimes > region.getFollowerList().size()) {
}
candidateLeader = null;
retryLeaderTimes = peers.size();
return false;
}

private void updateClientStub() {
String addressStr = targetStore.getStore().getAddress();
ManagedChannel channel =
channelFactory.getChannel(addressStr, regionManager.getPDClient().getHostMapping());
blockingStub = TikvGrpc.newBlockingStub(channel);
asyncStub = TikvGrpc.newStub(channel);
}

private boolean retryOtherStoreByProxyForward() {
if (!targetStore.isValid()) {
targetStore = regionManager.getStoreById(targetStore.getId());
logger.warn(
String.format(
"retry time exceed for region[%d], invalid this region[%d]",
region.getId(), targetStore.getId()));
regionManager.onRequestFail(region);
return false;
String.format("store [%d] has been invalid", region.getId(), targetStore.getId()));
return true;
}

TiStore proxyStore = switchProxyStore();
if (proxyStore == null) {
logger.warn(
String.format(
"no forward store can be selected for store [%s] and region[%d]",
targetStore.getStore().getAddress(), region.getId()));
regionManager.onRequestFail(region);
return false;
}
if (originStore == null) {
Expand All @@ -150,7 +267,7 @@ public boolean onStoreUnreachable() {
}
}
targetStore = proxyStore;
retryTimes += 1;
retryForwardTimes += 1;
logger.warn(
String.format(
"forward request to store [%s] by store [%s] for region[%d]",
Expand All @@ -167,58 +284,24 @@ public boolean onStoreUnreachable() {
return true;
}

@Override
protected void tryUpdateProxy() {
if (originStore != null) {
logger.warn(
String.format(
"update store [%s] by proxy-store [%s]",
targetStore.getStore().getAddress(), targetStore.getProxyStore().getAddress()));
regionManager.updateStore(originStore, targetStore);
}
}

private boolean checkHealth(Metapb.Store store) {
String addressStr = store.getAddress();
ManagedChannel channel =
channelFactory.getChannel(addressStr, regionManager.getPDClient().getHostMapping());
HealthGrpc.HealthBlockingStub stub =
HealthGrpc.newBlockingStub(channel)
.withDeadlineAfter(conf.getGrpcHealthCheckTimeout(), TimeUnit.MILLISECONDS);
HealthCheckRequest req = HealthCheckRequest.newBuilder().build();
try {
HealthCheckResponse resp = stub.check(req);
if (resp.getStatus() != HealthCheckResponse.ServingStatus.SERVING) {
return false;
}
} catch (Exception e) {
return false;
}
return true;
}

private TiStore switchProxyStore() {
boolean hasVisitedStore = false;
List<Metapb.Peer> peers = region.getFollowerList();
for (int i = 0; i < peers.size() * 2; i++) {
int idx = i % peers.size();
Metapb.Peer peer = peers.get(idx);
if (peer.getStoreId() != region.getLeader().getStoreId()) {
if (targetStore.getProxyStore() == null) {
TiStore store = regionManager.getStoreById(peer.getStoreId());
if (checkHealth(store.getStore())) {
return targetStore.withProxy(store.getStore());
}
} else {
if (peer.getStoreId() == targetStore.getProxyStore().getId()) {
hasVisitedStore = true;
} else if (hasVisitedStore) {
TiStore proxyStore = regionManager.getStoreById(peer.getStoreId());
if (!proxyStore.isUnreachable() && checkHealth(proxyStore.getStore())) {
return targetStore.withProxy(proxyStore.getStore());
}
}
if (peers.isEmpty()) {
return null;
}
Metapb.Store proxyStore = targetStore.getProxyStore();
if (proxyStore == null || peers.get(peers.size() - 1).getStoreId() == proxyStore.getId()) {
hasVisitedStore = true;
}
for (Metapb.Peer peer : peers) {
if (hasVisitedStore) {
TiStore store = regionManager.getStoreById(peer.getStoreId());
if (store.isReachable()) {
return targetStore.withProxy(store.getStore());
}
} else if (peer.getStoreId() == proxyStore.getId()) {
hasVisitedStore = true;
}
}
return null;
Expand Down
Loading