Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/main/java/org/tikv/common/AbstractGRPCClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ public <ReqT, RespT> RespT callWithRetry(
stub.getChannel(), method, stub.getCallOptions(), requestFactory.get());
},
method.getFullMethodName());
if (resp != null && this.conf.getEnableGrpcForward()) {
tryUpdateProxy();
}

if (logger.isTraceEnabled()) {
logger.trace(String.format("leaving %s...", method.getFullMethodName()));
Expand Down Expand Up @@ -177,6 +180,8 @@ public long getTimeout() {

protected abstract StubT getAsyncStub();

protected abstract void tryUpdateProxy();

protected boolean checkHealth(String addressStr, HostMapping hostMapping) {
ManagedChannel channel = channelFactory.getChannel(addressStr, hostMapping);
HealthGrpc.HealthBlockingStub stub =
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/org/tikv/common/ConfigUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
public class ConfigUtils {
public static final String TIKV_PD_ADDRESSES = "tikv.pd.addresses";
public static final String TIKV_GRPC_TIMEOUT = "tikv.grpc.timeout_in_ms";
public static final String TIKV_GRPC_FORWARD_TIMEOUT = "tikv.grpc.forward_timeout_in_ms";
public static final String TIKV_GRPC_SCAN_TIMEOUT = "tikv.grpc.scan_timeout_in_ms";
public static final String TIKV_GRPC_SCAN_BATCH_SIZE = "tikv.grpc.scan_batch_size";
public static final String TIKV_GRPC_MAX_FRAME_SIZE = "tikv.grpc.max_frame_size";
Expand Down Expand Up @@ -54,7 +55,8 @@ public class ConfigUtils {
public static final String TIKV_ENABLE_ATOMIC_FOR_CAS = "tikv.enable_atomic_for_cas";

public static final String DEF_PD_ADDRESSES = "127.0.0.1:2379";
public static final String DEF_TIMEOUT = "600ms";
public static final String DEF_TIMEOUT = "150ms";
public static final String DEF_FORWARD_TIMEOUT = "600ms";
public static final String DEF_SCAN_TIMEOUT = "20s";
public static final int DEF_CHECK_HEALTH_TIMEOUT = 40;
public static final int DEF_SCAN_BATCH_SIZE = 10240;
Expand Down
25 changes: 20 additions & 5 deletions src/main/java/org/tikv/common/PDClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,8 @@ PDClientWrapper getPdClientWrapper() {
private GetMembersResponse getMembers(URI uri) {
try {
ManagedChannel probChan = channelFactory.getChannel(uriToAddr(uri), hostMapping);
PDGrpc.PDBlockingStub stub = PDGrpc.newBlockingStub(probChan);
PDGrpc.PDBlockingStub stub =
PDGrpc.newBlockingStub(probChan).withDeadlineAfter(getTimeout(), TimeUnit.MILLISECONDS);
GetMembersRequest request =
GetMembersRequest.newBuilder().setHeader(RequestHeader.getDefaultInstance()).build();
GetMembersResponse resp = stub.getMembers(request);
Expand All @@ -335,7 +336,7 @@ private GetMembersResponse getMembers(URI uri) {
}
return resp;
} catch (Exception e) {
logger.warn("failed to get member from pd server.", e);
logger.debug("failed to get member from pd server.", e);
}
return null;
}
Expand All @@ -361,6 +362,7 @@ private synchronized boolean createLeaderClientWrapper(String leaderUrlStr) {
ManagedChannel clientChannel = channelFactory.getChannel(leaderUrlStr, hostMapping);
pdClientWrapper =
new PDClientWrapper(leaderUrlStr, leaderUrlStr, clientChannel, System.nanoTime());
timeout = conf.getTimeout();
} catch (IllegalArgumentException e) {
logger.error("Error updating leader. " + leaderUrlStr, e);
return false;
Expand All @@ -380,6 +382,7 @@ synchronized boolean createFollowerClientWrapper(String followerUrlStr, String l
// create new Leader
ManagedChannel channel = channelFactory.getChannel(followerUrlStr, hostMapping);
pdClientWrapper = new PDClientWrapper(leaderUrls, followerUrlStr, channel, System.nanoTime());
timeout = conf.getForwardTimeout();
} catch (IllegalArgumentException e) {
logger.error("Error updating follower. " + followerUrlStr, e);
return false;
Expand Down Expand Up @@ -411,6 +414,7 @@ public synchronized void updateLeaderOrforwardFollower() {
continue;
}

logger.info(String.format("can not switch to new leader, try follower forward"));
List<Pdpb.Member> members = resp.getMembersList();

boolean hasReachNextMember = false;
Expand All @@ -431,6 +435,8 @@ public synchronized void updateLeaderOrforwardFollower() {
continue;
}
if (hasReachNextMember && createFollowerClientWrapper(followerUrlStr, leaderUrlStr)) {
logger.warn(
String.format("forward request to pd [%s] by pd [%s]", leaderUrlStr, followerUrlStr));
return;
}
}
Expand Down Expand Up @@ -464,8 +470,10 @@ public void tryUpdateLeader() {
return;
}
}
throw new TiClientInternalException(
"already tried all address on file, but not leader found yet.");
if (pdClientWrapper == null) {
throw new TiClientInternalException(
"already tried all address on file, but not leader found yet.");
}
}

private synchronized void tryUpdateMembers(List<URI> members) {
Expand Down Expand Up @@ -541,6 +549,9 @@ protected PDStub getAsyncStub() {
return pdClientWrapper.getAsyncStub().withDeadlineAfter(getTimeout(), TimeUnit.MILLISECONDS);
}

@Override
protected void tryUpdateProxy() {}

private void initCluster() {
GetMembersResponse resp = null;
List<URI> pdAddrs = getConf().getPdAddrs();
Expand All @@ -558,13 +569,17 @@ private void initCluster() {
this.hostMapping =
Optional.ofNullable(getConf().getHostMapping())
.orElseGet(() -> new DefaultHostMapping(this.etcdClient, conf.getNetworkMappingName()));
// The first request may cost too much latency
long originTimeout = this.timeout;
this.timeout = 2000;
for (URI u : pdAddrs) {
resp = getMembers(u);
if (resp != null) {
break;
}
logger.info("Could not get leader member with pd: " + u);
}
this.timeout = originTimeout;
checkNotNull(resp, "Failed to init client for PD cluster.");
long clusterId = resp.getHeader().getClusterId();
header = RequestHeader.newBuilder().setClusterId(clusterId).build();
Expand Down Expand Up @@ -654,7 +669,7 @@ long getCreateTime() {

@Override
public String toString() {
return "[leaderInfo: " + leaderInfo + "]";
return "[leaderInfo: " + leaderInfo + ", storeAddress: " + storeAddress + "]";
}
}

Expand Down
11 changes: 11 additions & 0 deletions src/main/java/org/tikv/common/TiConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ private static void loadFromSystemProperties() {
private static void loadFromDefaultProperties() {
setIfMissing(TIKV_PD_ADDRESSES, DEF_PD_ADDRESSES);
setIfMissing(TIKV_GRPC_TIMEOUT, DEF_TIMEOUT);
setIfMissing(TIKV_GRPC_FORWARD_TIMEOUT, DEF_FORWARD_TIMEOUT);
setIfMissing(TIKV_GRPC_SCAN_TIMEOUT, DEF_SCAN_TIMEOUT);
setIfMissing(TIKV_GRPC_SCAN_BATCH_SIZE, DEF_SCAN_BATCH_SIZE);
setIfMissing(TIKV_GRPC_MAX_FRAME_SIZE, DEF_MAX_FRAME_SIZE);
Expand Down Expand Up @@ -237,6 +238,7 @@ private static ReplicaRead getReplicaRead(String key) {
}

private long timeout = getTimeAsMs(TIKV_GRPC_TIMEOUT);
private long forwardTimeout = getTimeAsMs(TIKV_GRPC_FORWARD_TIMEOUT);
private long scanTimeout = getTimeAsMs(TIKV_GRPC_SCAN_TIMEOUT);
private int maxFrameSize = getInt(TIKV_GRPC_MAX_FRAME_SIZE);
private List<URI> pdAddrs = getPdAddrs(TIKV_PD_ADDRESSES);
Expand Down Expand Up @@ -334,6 +336,15 @@ public TiConfiguration setTimeout(long timeout) {
return this;
}

public long getForwardTimeout() {
return forwardTimeout;
}

public TiConfiguration setForwardTimeout(long timeout) {
this.forwardTimeout = timeout;
return this;
}

public long getScanTimeout() {
return scanTimeout;
}
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/org/tikv/common/TiSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ public TiSession(TiConfiguration conf) {
this.client = PDClient.createRaw(conf, channelFactory);
this.enableGrpcForward = conf.getEnableGrpcForward();
this.metricsServer = MetricsServer.getInstance(conf);
if (this.enableGrpcForward) {
logger.info("enable grpc forward for high available");
}
logger.info("TiSession initialized in " + conf.getKvMode() + " mode");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,6 @@ public boolean handleRegionError(BackOffer backOffer, Errorpb.Error error) {
public boolean handleRequestError(BackOffer backOffer, Exception e) {
if (recv.onStoreUnreachable()) {
return true;
} else {
regionManager.onRequestFail(recv.getRegion());
}

backOffer.doBackOff(
Expand Down
81 changes: 57 additions & 24 deletions src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import io.grpc.stub.MetadataUtils;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.AbstractGRPCClient;
import org.tikv.common.TiConfiguration;
import org.tikv.common.exception.GrpcException;
Expand All @@ -38,10 +40,13 @@
public abstract class AbstractRegionStoreClient
extends AbstractGRPCClient<TikvGrpc.TikvBlockingStub, TikvGrpc.TikvStub>
implements RegionErrorReceiver {
private static final Logger logger = LoggerFactory.getLogger(AbstractRegionStoreClient.class);

protected final RegionManager regionManager;
protected TiRegion region;
protected TiStore targetStore;
protected TiStore originStore;
protected long retryTimes;

protected AbstractRegionStoreClient(
TiConfiguration conf,
Expand All @@ -58,6 +63,11 @@ protected AbstractRegionStoreClient(
this.region = region;
this.regionManager = regionManager;
this.targetStore = store;
this.originStore = null;
this.retryTimes = 0;
if (this.targetStore.getProxyStore() != null) {
this.timeout = conf.getForwardTimeout();
}
}

public TiRegion getRegion() {
Expand Down Expand Up @@ -106,26 +116,46 @@ public boolean onNotLeader(TiRegion newRegion) {
@Override
public boolean onStoreUnreachable() {
if (!conf.getEnableGrpcForward()) {
regionManager.onRequestFail(region);
return false;
}
if (region.getProxyStore() == null) {
if (targetStore.getProxyStore() == null) {
if (!targetStore.isUnreachable()) {
if (checkHealth(targetStore)) {
if (checkHealth(targetStore.getStore())) {
return true;
} else {
if (targetStore.markUnreachable()) {
this.regionManager.scheduleHealthCheckJob(targetStore);
}
}
}
} else if (retryTimes > region.getFollowerList().size()) {
logger.warn(
String.format(
"retry time exceed for region[%d], invalid this region and store[%d]",
region.getId(), targetStore.getId()));
regionManager.onRequestFail(region);
return false;
}
TiRegion proxyRegion = switchProxyStore();
if (proxyRegion == null) {
TiStore proxyStore = switchProxyStore();
if (proxyStore == null) {
logger.warn(
String.format(
"no forward store can be selected for store [%s] and region[%d]",
targetStore.getStore().getAddress(), region.getId()));
return false;
}
regionManager.updateRegion(region, proxyRegion);
region = proxyRegion;
String addressStr = region.getProxyStore().getStore().getAddress();
if (originStore == null) {
originStore = targetStore;
if (this.targetStore.getProxyStore() != null) {
this.timeout = conf.getForwardTimeout();
}
}
targetStore = proxyStore;
retryTimes += 1;
logger.warn(
String.format(
"forward request to store [%s] by store [%s] for region[%d]",
targetStore.getStore().getAddress(),
targetStore.getProxyStore().getAddress(),
region.getId()));
String addressStr = targetStore.getProxyStore().getAddress();
ManagedChannel channel =
channelFactory.getChannel(addressStr, regionManager.getPDClient().getHostMapping());
Metadata header = new Metadata();
Expand All @@ -135,11 +165,15 @@ public boolean onStoreUnreachable() {
return true;
}

private boolean checkHealth(TiStore store) {
if (store.getStore() == null) {
return false;
@Override
protected void tryUpdateProxy() {
if (originStore != null) {
regionManager.updateStore(originStore, targetStore);
}
String addressStr = store.getStore().getAddress();
}

private boolean checkHealth(Metapb.Store store) {
String addressStr = store.getAddress();
ManagedChannel channel =
channelFactory.getChannel(addressStr, regionManager.getPDClient().getHostMapping());
HealthGrpc.HealthBlockingStub stub =
Expand All @@ -157,26 +191,25 @@ private boolean checkHealth(TiStore store) {
return true;
}

private TiRegion switchProxyStore() {
private TiStore switchProxyStore() {
boolean hasVisitedStore = false;
List<Metapb.Peer> peers = region.getFollowerList();
for (int i = 0; i < peers.size() * 2; i++) {
int idx = i % peers.size();
Metapb.Peer peer = peers.get(idx);
if (peer.getStoreId() != region.getLeader().getStoreId()) {
if (region.getProxyStore() == null) {
if (targetStore.getProxyStore() == null) {
TiStore store = regionManager.getStoreById(peer.getStoreId());
if (checkHealth(store)) {
return region.switchProxyStore(store);
if (checkHealth(store.getStore())) {
return targetStore.withProxy(store.getStore());
}
} else {
TiStore proxyStore = region.getProxyStore();
if (peer.getStoreId() == proxyStore.getStore().getId()) {
if (peer.getStoreId() == targetStore.getProxyStore().getId()) {
hasVisitedStore = true;
} else if (hasVisitedStore) {
proxyStore = regionManager.getStoreById(peer.getStoreId());
if (!proxyStore.isUnreachable() && checkHealth(proxyStore)) {
return region.switchProxyStore(proxyStore);
TiStore proxyStore = regionManager.getStoreById(peer.getStoreId());
if (!proxyStore.isUnreachable() && checkHealth(proxyStore.getStore())) {
return targetStore.withProxy(proxyStore.getStore());
}
}
}
Expand Down
Loading