diff --git a/src/main/java/org/tikv/common/AbstractGRPCClient.java b/src/main/java/org/tikv/common/AbstractGRPCClient.java index 813b98629aa..7f1de71a409 100644 --- a/src/main/java/org/tikv/common/AbstractGRPCClient.java +++ b/src/main/java/org/tikv/common/AbstractGRPCClient.java @@ -89,6 +89,9 @@ public RespT callWithRetry( stub.getChannel(), method, stub.getCallOptions(), requestFactory.get()); }, method.getFullMethodName()); + if (resp != null && this.conf.getEnableGrpcForward()) { + tryUpdateProxy(); + } if (logger.isTraceEnabled()) { logger.trace(String.format("leaving %s...", method.getFullMethodName())); @@ -177,6 +180,8 @@ public long getTimeout() { protected abstract StubT getAsyncStub(); + protected abstract void tryUpdateProxy(); + protected boolean checkHealth(String addressStr, HostMapping hostMapping) { ManagedChannel channel = channelFactory.getChannel(addressStr, hostMapping); HealthGrpc.HealthBlockingStub stub = diff --git a/src/main/java/org/tikv/common/ConfigUtils.java b/src/main/java/org/tikv/common/ConfigUtils.java index 0c2460e566e..1ffe00c5d4d 100644 --- a/src/main/java/org/tikv/common/ConfigUtils.java +++ b/src/main/java/org/tikv/common/ConfigUtils.java @@ -20,6 +20,7 @@ public class ConfigUtils { public static final String TIKV_PD_ADDRESSES = "tikv.pd.addresses"; public static final String TIKV_GRPC_TIMEOUT = "tikv.grpc.timeout_in_ms"; + public static final String TIKV_GRPC_FORWARD_TIMEOUT = "tikv.grpc.forward_timeout_in_ms"; public static final String TIKV_GRPC_SCAN_TIMEOUT = "tikv.grpc.scan_timeout_in_ms"; public static final String TIKV_GRPC_SCAN_BATCH_SIZE = "tikv.grpc.scan_batch_size"; public static final String TIKV_GRPC_MAX_FRAME_SIZE = "tikv.grpc.max_frame_size"; @@ -54,7 +55,8 @@ public class ConfigUtils { public static final String TIKV_ENABLE_ATOMIC_FOR_CAS = "tikv.enable_atomic_for_cas"; public static final String DEF_PD_ADDRESSES = "127.0.0.1:2379"; - public static final String DEF_TIMEOUT = "600ms"; + public static final String DEF_TIMEOUT = "150ms"; + public static final String DEF_FORWARD_TIMEOUT = "600ms"; public static final String DEF_SCAN_TIMEOUT = "20s"; public static final int DEF_CHECK_HEALTH_TIMEOUT = 40; public static final int DEF_SCAN_BATCH_SIZE = 10240; diff --git a/src/main/java/org/tikv/common/PDClient.java b/src/main/java/org/tikv/common/PDClient.java index 23c5d459664..9de9d857b35 100644 --- a/src/main/java/org/tikv/common/PDClient.java +++ b/src/main/java/org/tikv/common/PDClient.java @@ -325,7 +325,8 @@ PDClientWrapper getPdClientWrapper() { private GetMembersResponse getMembers(URI uri) { try { ManagedChannel probChan = channelFactory.getChannel(uriToAddr(uri), hostMapping); - PDGrpc.PDBlockingStub stub = PDGrpc.newBlockingStub(probChan); + PDGrpc.PDBlockingStub stub = + PDGrpc.newBlockingStub(probChan).withDeadlineAfter(getTimeout(), TimeUnit.MILLISECONDS); GetMembersRequest request = GetMembersRequest.newBuilder().setHeader(RequestHeader.getDefaultInstance()).build(); GetMembersResponse resp = stub.getMembers(request); @@ -335,7 +336,7 @@ private GetMembersResponse getMembers(URI uri) { } return resp; } catch (Exception e) { - logger.warn("failed to get member from pd server.", e); + logger.debug("failed to get member from pd server.", e); } return null; } @@ -361,6 +362,7 @@ private synchronized boolean createLeaderClientWrapper(String leaderUrlStr) { ManagedChannel clientChannel = channelFactory.getChannel(leaderUrlStr, hostMapping); pdClientWrapper = new PDClientWrapper(leaderUrlStr, leaderUrlStr, clientChannel, System.nanoTime()); + timeout = conf.getTimeout(); } catch (IllegalArgumentException e) { logger.error("Error updating leader. " + leaderUrlStr, e); return false; @@ -380,6 +382,7 @@ synchronized boolean createFollowerClientWrapper(String followerUrlStr, String l // create new Leader ManagedChannel channel = channelFactory.getChannel(followerUrlStr, hostMapping); pdClientWrapper = new PDClientWrapper(leaderUrls, followerUrlStr, channel, System.nanoTime()); + timeout = conf.getForwardTimeout(); } catch (IllegalArgumentException e) { logger.error("Error updating follower. " + followerUrlStr, e); return false; @@ -411,6 +414,7 @@ public synchronized void updateLeaderOrforwardFollower() { continue; } + logger.info(String.format("can not switch to new leader, try follower forward")); List members = resp.getMembersList(); boolean hasReachNextMember = false; @@ -431,6 +435,8 @@ public synchronized void updateLeaderOrforwardFollower() { continue; } if (hasReachNextMember && createFollowerClientWrapper(followerUrlStr, leaderUrlStr)) { + logger.warn( + String.format("forward request to pd [%s] by pd [%s]", leaderUrlStr, followerUrlStr)); return; } } @@ -464,8 +470,10 @@ public void tryUpdateLeader() { return; } } - throw new TiClientInternalException( - "already tried all address on file, but not leader found yet."); + if (pdClientWrapper == null) { + throw new TiClientInternalException( + "already tried all address on file, but not leader found yet."); + } } private synchronized void tryUpdateMembers(List members) { @@ -541,6 +549,9 @@ protected PDStub getAsyncStub() { return pdClientWrapper.getAsyncStub().withDeadlineAfter(getTimeout(), TimeUnit.MILLISECONDS); } + @Override + protected void tryUpdateProxy() {} + private void initCluster() { GetMembersResponse resp = null; List pdAddrs = getConf().getPdAddrs(); @@ -558,6 +569,9 @@ private void initCluster() { this.hostMapping = Optional.ofNullable(getConf().getHostMapping()) .orElseGet(() -> new DefaultHostMapping(this.etcdClient, conf.getNetworkMappingName())); + // The first request may cost too much latency + long originTimeout = this.timeout; + this.timeout = 2000; for (URI u : pdAddrs) { resp = getMembers(u); if (resp != null) { @@ -565,6 +579,7 @@ private void initCluster() { } logger.info("Could not get leader member with pd: " + u); } + this.timeout = originTimeout; checkNotNull(resp, "Failed to init client for PD cluster."); long clusterId = resp.getHeader().getClusterId(); header = RequestHeader.newBuilder().setClusterId(clusterId).build(); @@ -654,7 +669,7 @@ long getCreateTime() { @Override public String toString() { - return "[leaderInfo: " + leaderInfo + "]"; + return "[leaderInfo: " + leaderInfo + ", storeAddress: " + storeAddress + "]"; } } diff --git a/src/main/java/org/tikv/common/TiConfiguration.java b/src/main/java/org/tikv/common/TiConfiguration.java index 8344f2b0fe6..4ee8eac9c54 100644 --- a/src/main/java/org/tikv/common/TiConfiguration.java +++ b/src/main/java/org/tikv/common/TiConfiguration.java @@ -54,6 +54,7 @@ private static void loadFromSystemProperties() { private static void loadFromDefaultProperties() { setIfMissing(TIKV_PD_ADDRESSES, DEF_PD_ADDRESSES); setIfMissing(TIKV_GRPC_TIMEOUT, DEF_TIMEOUT); + setIfMissing(TIKV_GRPC_FORWARD_TIMEOUT, DEF_FORWARD_TIMEOUT); setIfMissing(TIKV_GRPC_SCAN_TIMEOUT, DEF_SCAN_TIMEOUT); setIfMissing(TIKV_GRPC_SCAN_BATCH_SIZE, DEF_SCAN_BATCH_SIZE); setIfMissing(TIKV_GRPC_MAX_FRAME_SIZE, DEF_MAX_FRAME_SIZE); @@ -237,6 +238,7 @@ private static ReplicaRead getReplicaRead(String key) { } private long timeout = getTimeAsMs(TIKV_GRPC_TIMEOUT); + private long forwardTimeout = getTimeAsMs(TIKV_GRPC_FORWARD_TIMEOUT); private long scanTimeout = getTimeAsMs(TIKV_GRPC_SCAN_TIMEOUT); private int maxFrameSize = getInt(TIKV_GRPC_MAX_FRAME_SIZE); private List pdAddrs = getPdAddrs(TIKV_PD_ADDRESSES); @@ -334,6 +336,15 @@ public TiConfiguration setTimeout(long timeout) { return this; } + public long getForwardTimeout() { + return forwardTimeout; + } + + public TiConfiguration setForwardTimeout(long timeout) { + this.forwardTimeout = timeout; + return this; + } + public long getScanTimeout() { return scanTimeout; } diff --git a/src/main/java/org/tikv/common/TiSession.java b/src/main/java/org/tikv/common/TiSession.java index f495c2bc44c..a7b6c543085 100644 --- a/src/main/java/org/tikv/common/TiSession.java +++ b/src/main/java/org/tikv/common/TiSession.java @@ -79,6 +79,9 @@ public TiSession(TiConfiguration conf) { this.client = PDClient.createRaw(conf, channelFactory); this.enableGrpcForward = conf.getEnableGrpcForward(); this.metricsServer = MetricsServer.getInstance(conf); + if (this.enableGrpcForward) { + logger.info("enable grpc forward for high available"); + } logger.info("TiSession initialized in " + conf.getKvMode() + " mode"); } diff --git a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java index 10744ee8d28..32b0a3cf2e1 100644 --- a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java +++ b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java @@ -169,8 +169,6 @@ public boolean handleRegionError(BackOffer backOffer, Errorpb.Error error) { public boolean handleRequestError(BackOffer backOffer, Exception e) { if (recv.onStoreUnreachable()) { return true; - } else { - regionManager.onRequestFail(recv.getRegion()); } backOffer.doBackOff( diff --git a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java index 2b28b3f0874..4748a51070e 100644 --- a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java @@ -28,6 +28,8 @@ import io.grpc.stub.MetadataUtils; import java.util.List; import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.tikv.common.AbstractGRPCClient; import org.tikv.common.TiConfiguration; import org.tikv.common.exception.GrpcException; @@ -38,10 +40,13 @@ public abstract class AbstractRegionStoreClient extends AbstractGRPCClient implements RegionErrorReceiver { + private static final Logger logger = LoggerFactory.getLogger(AbstractRegionStoreClient.class); protected final RegionManager regionManager; protected TiRegion region; protected TiStore targetStore; + protected TiStore originStore; + protected long retryTimes; protected AbstractRegionStoreClient( TiConfiguration conf, @@ -58,6 +63,11 @@ protected AbstractRegionStoreClient( this.region = region; this.regionManager = regionManager; this.targetStore = store; + this.originStore = null; + this.retryTimes = 0; + if (this.targetStore.getProxyStore() != null) { + this.timeout = conf.getForwardTimeout(); + } } public TiRegion getRegion() { @@ -106,26 +116,46 @@ public boolean onNotLeader(TiRegion newRegion) { @Override public boolean onStoreUnreachable() { if (!conf.getEnableGrpcForward()) { + regionManager.onRequestFail(region); return false; } - if (region.getProxyStore() == null) { + if (targetStore.getProxyStore() == null) { if (!targetStore.isUnreachable()) { - if (checkHealth(targetStore)) { + if (checkHealth(targetStore.getStore())) { return true; - } else { - if (targetStore.markUnreachable()) { - this.regionManager.scheduleHealthCheckJob(targetStore); - } } } + } else if (retryTimes > region.getFollowerList().size()) { + logger.warn( + String.format( + "retry time exceed for region[%d], invalid this region and store[%d]", + region.getId(), targetStore.getId())); + regionManager.onRequestFail(region); + return false; } - TiRegion proxyRegion = switchProxyStore(); - if (proxyRegion == null) { + TiStore proxyStore = switchProxyStore(); + if (proxyStore == null) { + logger.warn( + String.format( + "no forward store can be selected for store [%s] and region[%d]", + targetStore.getStore().getAddress(), region.getId())); return false; } - regionManager.updateRegion(region, proxyRegion); - region = proxyRegion; - String addressStr = region.getProxyStore().getStore().getAddress(); + if (originStore == null) { + originStore = targetStore; + if (this.targetStore.getProxyStore() != null) { + this.timeout = conf.getForwardTimeout(); + } + } + targetStore = proxyStore; + retryTimes += 1; + logger.warn( + String.format( + "forward request to store [%s] by store [%s] for region[%d]", + targetStore.getStore().getAddress(), + targetStore.getProxyStore().getAddress(), + region.getId())); + String addressStr = targetStore.getProxyStore().getAddress(); ManagedChannel channel = channelFactory.getChannel(addressStr, regionManager.getPDClient().getHostMapping()); Metadata header = new Metadata(); @@ -135,11 +165,15 @@ public boolean onStoreUnreachable() { return true; } - private boolean checkHealth(TiStore store) { - if (store.getStore() == null) { - return false; + @Override + protected void tryUpdateProxy() { + if (originStore != null) { + regionManager.updateStore(originStore, targetStore); } - String addressStr = store.getStore().getAddress(); + } + + private boolean checkHealth(Metapb.Store store) { + String addressStr = store.getAddress(); ManagedChannel channel = channelFactory.getChannel(addressStr, regionManager.getPDClient().getHostMapping()); HealthGrpc.HealthBlockingStub stub = @@ -157,26 +191,25 @@ private boolean checkHealth(TiStore store) { return true; } - private TiRegion switchProxyStore() { + private TiStore switchProxyStore() { boolean hasVisitedStore = false; List peers = region.getFollowerList(); for (int i = 0; i < peers.size() * 2; i++) { int idx = i % peers.size(); Metapb.Peer peer = peers.get(idx); if (peer.getStoreId() != region.getLeader().getStoreId()) { - if (region.getProxyStore() == null) { + if (targetStore.getProxyStore() == null) { TiStore store = regionManager.getStoreById(peer.getStoreId()); - if (checkHealth(store)) { - return region.switchProxyStore(store); + if (checkHealth(store.getStore())) { + return targetStore.withProxy(store.getStore()); } } else { - TiStore proxyStore = region.getProxyStore(); - if (peer.getStoreId() == proxyStore.getStore().getId()) { + if (peer.getStoreId() == targetStore.getProxyStore().getId()) { hasVisitedStore = true; } else if (hasVisitedStore) { - proxyStore = regionManager.getStoreById(peer.getStoreId()); - if (!proxyStore.isUnreachable() && checkHealth(proxyStore)) { - return region.switchProxyStore(proxyStore); + TiStore proxyStore = regionManager.getStoreById(peer.getStoreId()); + if (!proxyStore.isUnreachable() && checkHealth(proxyStore.getStore())) { + return targetStore.withProxy(proxyStore.getStore()); } } } diff --git a/src/main/java/org/tikv/common/region/RegionManager.java b/src/main/java/org/tikv/common/region/RegionManager.java index e4d769911ea..662b8460e59 100644 --- a/src/main/java/org/tikv/common/region/RegionManager.java +++ b/src/main/java/org/tikv/common/region/RegionManager.java @@ -215,8 +215,16 @@ public synchronized TiRegion updateLeader(TiRegion region, long storeId) { return null; } - public synchronized boolean updateRegion(TiRegion oldRegion, TiRegion region) { - return cache.updateRegion(oldRegion, region); + public synchronized void updateStore(TiStore oldStore, TiStore newStore) { + if (cache.updateStore(oldStore, newStore)) { + if (newStore.isUnreachable()) { + logger.warn( + String.format( + "check health for store [%s] in background thread", + newStore.getStore().getAddress())); + this.storeChecker.scheduleStoreHealthCheck(newStore); + } + } } /** Clears all cache when some unexpected error occurs. */ @@ -229,15 +237,8 @@ public void clearRegionCache() { * * @param region region */ - public void onRequestFail(TiRegion region) { - onRequestFail(region, region.getLeader().getStoreId()); - } - - private void onRequestFail(TiRegion region, long storeId) { - if (this.storeChecker != null) { - cache.invalidateRegion(region); - cache.invalidateAllRegionForStore(storeId); - } + public synchronized void onRequestFail(TiRegion region) { + cache.invalidateRegion(region); } public void invalidateStore(long storeId) { @@ -248,10 +249,6 @@ public void invalidateRegion(TiRegion region) { cache.invalidateRegion(region); } - public void scheduleHealthCheckJob(TiStore store) { - this.storeChecker.scheduleStoreHealthCheck(store); - } - public static class RegionCache { private final Map regionCache; private final Map storeCache; @@ -370,10 +367,29 @@ public synchronized boolean updateRegion(TiRegion expected, TiRegion region) { } } - public synchronized void invalidateAllRegionForStore(long storeId) { + public synchronized boolean updateStore(TiStore oldStore, TiStore newStore) { + TiStore originStore = storeCache.get(oldStore.getId()); + if (originStore == oldStore) { + storeCache.put(newStore.getId(), newStore); + if (oldStore != null && oldStore.isUnreachable()) { + oldStore.markReachable(); + } + if (newStore.getProxyStore() != null) { + newStore.markUnreachable(); + } + return true; + } + return false; + } + + public synchronized void invalidateAllRegionForStore(TiStore store) { + TiStore oldStore = storeCache.get(store.getId()); + if (oldStore != store) { + return; + } List regionToRemove = new ArrayList<>(); for (TiRegion r : regionCache.values()) { - if (r.getLeader().getStoreId() == storeId) { + if (r.getLeader().getStoreId() == store.getId()) { if (logger.isDebugEnabled()) { logger.debug(String.format("invalidateAllRegionForStore Region[%s]", r)); } @@ -381,6 +397,7 @@ public synchronized void invalidateAllRegionForStore(long storeId) { } } + logger.warn(String.format("invalid store [%d]", store.getId())); // remove region for (TiRegion r : regionToRemove) { keyToRegionIdCache.remove(makeRange(r.getStartKey(), r.getEndKey())); @@ -421,7 +438,7 @@ private List getRegionStore(List peers, BackOffer backOffe private TiRegion createRegion(Metapb.Region region, Metapb.Peer leader, BackOffer backOffer) { List peers = region.getPeersList(); List stores = getRegionStore(peers, backOffer); - return new TiRegion(conf, region, leader, peers, stores, null); + return new TiRegion(conf, region, leader, peers, stores); } public synchronized void clearAll() { diff --git a/src/main/java/org/tikv/common/region/RegionStoreClient.java b/src/main/java/org/tikv/common/region/RegionStoreClient.java index 46d5efec444..5839c14e977 100644 --- a/src/main/java/org/tikv/common/region/RegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/RegionStoreClient.java @@ -1269,8 +1269,8 @@ public RegionStoreClient build(TiRegion region, TiStore store, TiStoreType store TikvBlockingStub blockingStub = null; TikvStub asyncStub = null; - if (conf.getEnableGrpcForward() && region.getProxyStore() != null && store.isUnreachable()) { - addressStr = region.getProxyStore().getStore().getAddress(); + if (conf.getEnableGrpcForward() && store.getProxyStore() != null && store.isUnreachable()) { + addressStr = store.getProxyStore().getAddress(); channel = channelFactory.getChannel(addressStr, regionManager.getPDClient().getHostMapping()); Metadata header = new Metadata(); @@ -1278,13 +1278,16 @@ public RegionStoreClient build(TiRegion region, TiStore store, TiStoreType store blockingStub = MetadataUtils.attachHeaders(TikvGrpc.newBlockingStub(channel), header); asyncStub = MetadataUtils.attachHeaders(TikvGrpc.newStub(channel), header); } else { - // If the store is reachable, which is update by check-health thread + // If the store is reachable, which is update by check-health thread, cancel proxy forward. if (!store.isUnreachable()) { - if (region.getProxyStore() != null) { - TiRegion newRegion = region.switchProxyStore(null); - if (regionManager.updateRegion(region, newRegion)) { - region = newRegion; - } + if (store.getProxyStore() != null) { + logger.warn( + String.format( + "cancel request to store [%s] forward by store[%s]", + store.getStore().getAddress(), store.getProxyStore().getAddress())); + TiStore newStore = store.withProxy(null); + regionManager.updateStore(store, newStore); + store = newStore; } } channel = channelFactory.getChannel(addressStr, pdClient.getHostMapping()); diff --git a/src/main/java/org/tikv/common/region/TiRegion.java b/src/main/java/org/tikv/common/region/TiRegion.java index 2fbb980612d..f7b736dbe2f 100644 --- a/src/main/java/org/tikv/common/region/TiRegion.java +++ b/src/main/java/org/tikv/common/region/TiRegion.java @@ -49,18 +49,12 @@ public class TiRegion implements Serializable { private final Peer leader; private final ReplicaSelector replicaSelector; private final List replicaList; - private final TiStore proxyStore; private int replicaIdx; private final List peers; private final List stores; public TiRegion( - TiConfiguration conf, - Region meta, - Peer leader, - List peers, - List stores, - TiStore proxyStore) { + TiConfiguration conf, Region meta, Peer leader, List peers, List stores) { this.conf = Objects.requireNonNull(conf, "conf is null"); this.meta = Objects.requireNonNull(meta, "meta is null"); this.isolationLevel = conf.getIsolationLevel(); @@ -68,7 +62,6 @@ public TiRegion( this.peers = peers; this.stores = stores; this.replicaSelector = conf.getReplicaSelector(); - this.proxyStore = proxyStore; if (leader == null || leader.getId() == 0) { if (meta.getPeersCount() == 0) { throw new TiClientInternalException("Empty peer list for region " + meta.getId()); @@ -182,10 +175,6 @@ public RegionVerID getVerID() { meta.getId(), meta.getRegionEpoch().getConfVer(), meta.getRegionEpoch().getVersion()); } - public TiStore getProxyStore() { - return proxyStore; - } - /** * switches current peer to the one on specific store. It return false if no peer matches the * storeID. @@ -197,16 +186,12 @@ public TiRegion switchPeer(long leaderStoreID) { List peers = meta.getPeersList(); for (Peer p : peers) { if (p.getStoreId() == leaderStoreID) { - return new TiRegion(this.conf, this.meta, p, peers, this.stores, this.proxyStore); + return new TiRegion(this.conf, this.meta, p, peers, this.stores); } } return null; } - public TiRegion switchProxyStore(TiStore store) { - return new TiRegion(this.conf, this.meta, this.leader, this.peers, this.stores, store); - } - public boolean isMoreThan(ByteString key) { return FastByteComparisons.compareTo( meta.getStartKey().toByteArray(), diff --git a/src/main/java/org/tikv/common/region/TiStore.java b/src/main/java/org/tikv/common/region/TiStore.java index db1f2f30443..f6c7fc80ebb 100644 --- a/src/main/java/org/tikv/common/region/TiStore.java +++ b/src/main/java/org/tikv/common/region/TiStore.java @@ -5,15 +5,27 @@ public class TiStore { private final Metapb.Store store; + private final Metapb.Store proxyStore; private AtomicBoolean unreachable; public TiStore(Metapb.Store store) { this.store = store; this.unreachable = new AtomicBoolean(false); + this.proxyStore = null; } - public boolean markUnreachable() { - return this.unreachable.compareAndSet(false, true); + private TiStore(Metapb.Store store, Metapb.Store proxyStore) { + this.store = store; + this.unreachable = new AtomicBoolean(false); + this.proxyStore = proxyStore; + } + + public TiStore withProxy(Metapb.Store proxyStore) { + return new TiStore(this.store, proxyStore); + } + + public void markUnreachable() { + this.unreachable.set(true); } public void markReachable() { @@ -28,6 +40,14 @@ public Metapb.Store getStore() { return this.store; } + public String getAddress() { + return this.store.getAddress(); + } + + public Metapb.Store getProxyStore() { + return this.proxyStore; + } + public long getId() { return this.store.getId(); } diff --git a/src/main/java/org/tikv/common/region/UnreachableStoreChecker.java b/src/main/java/org/tikv/common/region/UnreachableStoreChecker.java index c13d2ba0c13..11ea49b639d 100644 --- a/src/main/java/org/tikv/common/region/UnreachableStoreChecker.java +++ b/src/main/java/org/tikv/common/region/UnreachableStoreChecker.java @@ -9,12 +9,15 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.tikv.common.ReadOnlyPDClient; import org.tikv.common.util.ChannelFactory; import org.tikv.common.util.ConcreteBackOffer; import org.tikv.kvproto.Metapb; public class UnreachableStoreChecker implements Runnable { + private static final Logger logger = LoggerFactory.getLogger(UnreachableStoreChecker.class); private ConcurrentHashMap stores; private BlockingQueue taskQueue; private final ChannelFactory channelFactory; @@ -67,6 +70,9 @@ public void run() { HealthCheckResponse resp = stub.check(req); if (resp.getStatus() == HealthCheckResponse.ServingStatus.SERVING) { store.markReachable(); + logger.warn( + String.format("store [%s] recovers to be reachable", store.getStore().getAddress())); + this.stores.remove(Long.valueOf(store.getId())); continue; } diff --git a/src/main/java/org/tikv/raw/RawKVClient.java b/src/main/java/org/tikv/raw/RawKVClient.java index 87cac1361b9..79974b60c98 100644 --- a/src/main/java/org/tikv/raw/RawKVClient.java +++ b/src/main/java/org/tikv/raw/RawKVClient.java @@ -672,6 +672,7 @@ private void doSendBatchPut(BackOffer backOffer, Map kvP private List doSendBatchPutInBatchesWithRetry(BackOffer backOffer, Batch batch, long ttl) { try (RegionStoreClient client = clientBuilder.build(batch.getRegion())) { + client.setTimeout(conf.getScanTimeout()); client.rawBatchPut(backOffer, batch, ttl, atomicForCAS); return new ArrayList<>(); } catch (final TiKVException e) { diff --git a/src/test/java/org/tikv/common/MockServerTest.java b/src/test/java/org/tikv/common/MockServerTest.java index d3fd36f1fe4..c99688729d2 100644 --- a/src/test/java/org/tikv/common/MockServerTest.java +++ b/src/test/java/org/tikv/common/MockServerTest.java @@ -44,8 +44,7 @@ public void setUp() throws IOException { r, r.getPeers(0), r.getPeersList(), - s.stream().map(TiStore::new).collect(Collectors.toList()), - null); + s.stream().map(TiStore::new).collect(Collectors.toList())); pdServer.addGetRegionResp(Pdpb.GetRegionResponse.newBuilder().setRegion(r).build()); for (Metapb.Store store : s) { pdServer.addGetStoreResp(Pdpb.GetStoreResponse.newBuilder().setStore(store).build());