From b31e5143873f841c4e7353e039fd65c1a5167263 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Wed, 30 Jun 2021 10:57:37 +0800 Subject: [PATCH 01/10] optimiza proxy store Signed-off-by: Little-Wallace --- .../org/tikv/common/AbstractGRPCClient.java | 5 ++ src/main/java/org/tikv/common/PDClient.java | 3 ++ .../region/AbstractRegionStoreClient.java | 51 ++++++++++--------- .../org/tikv/common/region/RegionManager.java | 24 ++++++--- .../tikv/common/region/RegionStoreClient.java | 13 +++-- .../java/org/tikv/common/region/TiRegion.java | 19 +------ .../java/org/tikv/common/region/TiStore.java | 24 ++++++++- .../java/org/tikv/common/MockServerTest.java | 3 +- 8 files changed, 83 insertions(+), 59 deletions(-) diff --git a/src/main/java/org/tikv/common/AbstractGRPCClient.java b/src/main/java/org/tikv/common/AbstractGRPCClient.java index 813b98629aa..7f1de71a409 100644 --- a/src/main/java/org/tikv/common/AbstractGRPCClient.java +++ b/src/main/java/org/tikv/common/AbstractGRPCClient.java @@ -89,6 +89,9 @@ public RespT callWithRetry( stub.getChannel(), method, stub.getCallOptions(), requestFactory.get()); }, method.getFullMethodName()); + if (resp != null && this.conf.getEnableGrpcForward()) { + tryUpdateProxy(); + } if (logger.isTraceEnabled()) { logger.trace(String.format("leaving %s...", method.getFullMethodName())); @@ -177,6 +180,8 @@ public long getTimeout() { protected abstract StubT getAsyncStub(); + protected abstract void tryUpdateProxy(); + protected boolean checkHealth(String addressStr, HostMapping hostMapping) { ManagedChannel channel = channelFactory.getChannel(addressStr, hostMapping); HealthGrpc.HealthBlockingStub stub = diff --git a/src/main/java/org/tikv/common/PDClient.java b/src/main/java/org/tikv/common/PDClient.java index 23c5d459664..f399a21ab08 100644 --- a/src/main/java/org/tikv/common/PDClient.java +++ b/src/main/java/org/tikv/common/PDClient.java @@ -541,6 +541,9 @@ protected PDStub getAsyncStub() { return pdClientWrapper.getAsyncStub().withDeadlineAfter(getTimeout(), TimeUnit.MILLISECONDS); } + @Override + protected void tryUpdateProxy() {} + private void initCluster() { GetMembersResponse resp = null; List pdAddrs = getConf().getPdAddrs(); diff --git a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java index 2b28b3f0874..7b008dac171 100644 --- a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java @@ -42,6 +42,7 @@ public abstract class AbstractRegionStoreClient protected final RegionManager regionManager; protected TiRegion region; protected TiStore targetStore; + protected TiStore originStore; protected AbstractRegionStoreClient( TiConfiguration conf, @@ -58,6 +59,7 @@ protected AbstractRegionStoreClient( this.region = region; this.regionManager = regionManager; this.targetStore = store; + this.originStore = null; } public TiRegion getRegion() { @@ -108,24 +110,22 @@ public boolean onStoreUnreachable() { if (!conf.getEnableGrpcForward()) { return false; } - if (region.getProxyStore() == null) { + if (targetStore.getProxyStore() == null) { if (!targetStore.isUnreachable()) { - if (checkHealth(targetStore)) { + if (checkHealth(targetStore.getStore())) { return true; - } else { - if (targetStore.markUnreachable()) { - this.regionManager.scheduleHealthCheckJob(targetStore); - } } } } - TiRegion proxyRegion = switchProxyStore(); - if (proxyRegion == null) { + TiStore proxyStore = switchProxyStore(); + if (proxyStore == null) { return false; } - regionManager.updateRegion(region, proxyRegion); - region = proxyRegion; - String addressStr = region.getProxyStore().getStore().getAddress(); + if (originStore == null) { + originStore = targetStore; + } + targetStore = proxyStore; + String addressStr = targetStore.getProxyStore().getAddress(); ManagedChannel channel = channelFactory.getChannel(addressStr, regionManager.getPDClient().getHostMapping()); Metadata header = new Metadata(); @@ -135,11 +135,15 @@ public boolean onStoreUnreachable() { return true; } - private boolean checkHealth(TiStore store) { - if (store.getStore() == null) { - return false; + @Override + protected void tryUpdateProxy() { + if (originStore != null) { + regionManager.updateStore(originStore, targetStore); } - String addressStr = store.getStore().getAddress(); + } + + private boolean checkHealth(Metapb.Store store) { + String addressStr = store.getAddress(); ManagedChannel channel = channelFactory.getChannel(addressStr, regionManager.getPDClient().getHostMapping()); HealthGrpc.HealthBlockingStub stub = @@ -157,26 +161,25 @@ private boolean checkHealth(TiStore store) { return true; } - private TiRegion switchProxyStore() { + private TiStore switchProxyStore() { boolean hasVisitedStore = false; List peers = region.getFollowerList(); for (int i = 0; i < peers.size() * 2; i++) { int idx = i % peers.size(); Metapb.Peer peer = peers.get(idx); if (peer.getStoreId() != region.getLeader().getStoreId()) { - if (region.getProxyStore() == null) { + if (targetStore.getProxyStore() == null) { TiStore store = regionManager.getStoreById(peer.getStoreId()); - if (checkHealth(store)) { - return region.switchProxyStore(store); + if (checkHealth(store.getStore())) { + return targetStore.withProxy(store.getStore()); } } else { - TiStore proxyStore = region.getProxyStore(); - if (peer.getStoreId() == proxyStore.getStore().getId()) { + if (peer.getStoreId() == targetStore.getStore().getId()) { hasVisitedStore = true; } else if (hasVisitedStore) { - proxyStore = regionManager.getStoreById(peer.getStoreId()); - if (!proxyStore.isUnreachable() && checkHealth(proxyStore)) { - return region.switchProxyStore(proxyStore); + TiStore proxyStore = regionManager.getStoreById(peer.getStoreId()); + if (!proxyStore.isUnreachable() && checkHealth(proxyStore.getStore())) { + return targetStore.withProxy(proxyStore.getStore()); } } } diff --git a/src/main/java/org/tikv/common/region/RegionManager.java b/src/main/java/org/tikv/common/region/RegionManager.java index e4d769911ea..062327135e0 100644 --- a/src/main/java/org/tikv/common/region/RegionManager.java +++ b/src/main/java/org/tikv/common/region/RegionManager.java @@ -215,8 +215,10 @@ public synchronized TiRegion updateLeader(TiRegion region, long storeId) { return null; } - public synchronized boolean updateRegion(TiRegion oldRegion, TiRegion region) { - return cache.updateRegion(oldRegion, region); + public synchronized void updateStore(TiStore oldStore, TiStore newStore) { + if (cache.updateStore(oldStore, newStore)) { + this.storeChecker.scheduleStoreHealthCheck(newStore); + } } /** Clears all cache when some unexpected error occurs. */ @@ -248,10 +250,6 @@ public void invalidateRegion(TiRegion region) { cache.invalidateRegion(region); } - public void scheduleHealthCheckJob(TiStore store) { - this.storeChecker.scheduleStoreHealthCheck(store); - } - public static class RegionCache { private final Map regionCache; private final Map storeCache; @@ -370,6 +368,18 @@ public synchronized boolean updateRegion(TiRegion expected, TiRegion region) { } } + public synchronized boolean updateStore(TiStore oldStore, TiStore newStore) { + TiStore originStore = storeCache.get(oldStore.getId()); + if (originStore == oldStore) { + storeCache.put(newStore.getId(), newStore); + if (newStore.getProxyStore() != null) { + newStore.markUnreachable(); + return true; + } + } + return false; + } + public synchronized void invalidateAllRegionForStore(long storeId) { List regionToRemove = new ArrayList<>(); for (TiRegion r : regionCache.values()) { @@ -421,7 +431,7 @@ private List getRegionStore(List peers, BackOffer backOffe private TiRegion createRegion(Metapb.Region region, Metapb.Peer leader, BackOffer backOffer) { List peers = region.getPeersList(); List stores = getRegionStore(peers, backOffer); - return new TiRegion(conf, region, leader, peers, stores, null); + return new TiRegion(conf, region, leader, peers, stores); } public synchronized void clearAll() { diff --git a/src/main/java/org/tikv/common/region/RegionStoreClient.java b/src/main/java/org/tikv/common/region/RegionStoreClient.java index 46d5efec444..dc676827b33 100644 --- a/src/main/java/org/tikv/common/region/RegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/RegionStoreClient.java @@ -1269,8 +1269,8 @@ public RegionStoreClient build(TiRegion region, TiStore store, TiStoreType store TikvBlockingStub blockingStub = null; TikvStub asyncStub = null; - if (conf.getEnableGrpcForward() && region.getProxyStore() != null && store.isUnreachable()) { - addressStr = region.getProxyStore().getStore().getAddress(); + if (conf.getEnableGrpcForward() && store.getProxyStore() != null && store.isUnreachable()) { + addressStr = store.getProxyStore().getAddress(); channel = channelFactory.getChannel(addressStr, regionManager.getPDClient().getHostMapping()); Metadata header = new Metadata(); @@ -1280,11 +1280,10 @@ public RegionStoreClient build(TiRegion region, TiStore store, TiStoreType store } else { // If the store is reachable, which is update by check-health thread if (!store.isUnreachable()) { - if (region.getProxyStore() != null) { - TiRegion newRegion = region.switchProxyStore(null); - if (regionManager.updateRegion(region, newRegion)) { - region = newRegion; - } + if (store.getProxyStore() != null) { + TiStore newStore = store.withProxy(null); + regionManager.updateStore(store, newStore); + store = newStore; } } channel = channelFactory.getChannel(addressStr, pdClient.getHostMapping()); diff --git a/src/main/java/org/tikv/common/region/TiRegion.java b/src/main/java/org/tikv/common/region/TiRegion.java index 2fbb980612d..f7b736dbe2f 100644 --- a/src/main/java/org/tikv/common/region/TiRegion.java +++ b/src/main/java/org/tikv/common/region/TiRegion.java @@ -49,18 +49,12 @@ public class TiRegion implements Serializable { private final Peer leader; private final ReplicaSelector replicaSelector; private final List replicaList; - private final TiStore proxyStore; private int replicaIdx; private final List peers; private final List stores; public TiRegion( - TiConfiguration conf, - Region meta, - Peer leader, - List peers, - List stores, - TiStore proxyStore) { + TiConfiguration conf, Region meta, Peer leader, List peers, List stores) { this.conf = Objects.requireNonNull(conf, "conf is null"); this.meta = Objects.requireNonNull(meta, "meta is null"); this.isolationLevel = conf.getIsolationLevel(); @@ -68,7 +62,6 @@ public TiRegion( this.peers = peers; this.stores = stores; this.replicaSelector = conf.getReplicaSelector(); - this.proxyStore = proxyStore; if (leader == null || leader.getId() == 0) { if (meta.getPeersCount() == 0) { throw new TiClientInternalException("Empty peer list for region " + meta.getId()); @@ -182,10 +175,6 @@ public RegionVerID getVerID() { meta.getId(), meta.getRegionEpoch().getConfVer(), meta.getRegionEpoch().getVersion()); } - public TiStore getProxyStore() { - return proxyStore; - } - /** * switches current peer to the one on specific store. It return false if no peer matches the * storeID. @@ -197,16 +186,12 @@ public TiRegion switchPeer(long leaderStoreID) { List peers = meta.getPeersList(); for (Peer p : peers) { if (p.getStoreId() == leaderStoreID) { - return new TiRegion(this.conf, this.meta, p, peers, this.stores, this.proxyStore); + return new TiRegion(this.conf, this.meta, p, peers, this.stores); } } return null; } - public TiRegion switchProxyStore(TiStore store) { - return new TiRegion(this.conf, this.meta, this.leader, this.peers, this.stores, store); - } - public boolean isMoreThan(ByteString key) { return FastByteComparisons.compareTo( meta.getStartKey().toByteArray(), diff --git a/src/main/java/org/tikv/common/region/TiStore.java b/src/main/java/org/tikv/common/region/TiStore.java index db1f2f30443..17ef70935d0 100644 --- a/src/main/java/org/tikv/common/region/TiStore.java +++ b/src/main/java/org/tikv/common/region/TiStore.java @@ -5,15 +5,27 @@ public class TiStore { private final Metapb.Store store; + private final Metapb.Store proxyStore; private AtomicBoolean unreachable; public TiStore(Metapb.Store store) { this.store = store; this.unreachable = new AtomicBoolean(false); + this.proxyStore = null; } - public boolean markUnreachable() { - return this.unreachable.compareAndSet(false, true); + private TiStore(Metapb.Store store, Metapb.Store proxyStore, boolean unreachable) { + this.store = store; + this.unreachable = new AtomicBoolean(unreachable); + this.proxyStore = proxyStore; + } + + public TiStore withProxy(Metapb.Store proxyStore) { + return new TiStore(this.store, proxyStore, this.unreachable.get()); + } + + public void markUnreachable() { + this.unreachable.set(true); } public void markReachable() { @@ -28,6 +40,14 @@ public Metapb.Store getStore() { return this.store; } + public String getAddress() { + return this.store.getAddress(); + } + + public Metapb.Store getProxyStore() { + return this.proxyStore; + } + public long getId() { return this.store.getId(); } diff --git a/src/test/java/org/tikv/common/MockServerTest.java b/src/test/java/org/tikv/common/MockServerTest.java index d3fd36f1fe4..c99688729d2 100644 --- a/src/test/java/org/tikv/common/MockServerTest.java +++ b/src/test/java/org/tikv/common/MockServerTest.java @@ -44,8 +44,7 @@ public void setUp() throws IOException { r, r.getPeers(0), r.getPeersList(), - s.stream().map(TiStore::new).collect(Collectors.toList()), - null); + s.stream().map(TiStore::new).collect(Collectors.toList())); pdServer.addGetRegionResp(Pdpb.GetRegionResponse.newBuilder().setRegion(r).build()); for (Metapb.Store store : s) { pdServer.addGetStoreResp(Pdpb.GetStoreResponse.newBuilder().setStore(store).build()); From 182381977cbb1e49a892820a97148bf7093d3e5b Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Wed, 30 Jun 2021 11:11:09 +0800 Subject: [PATCH 02/10] use high latency for forward request Signed-off-by: Little-Wallace --- src/main/java/org/tikv/common/ConfigUtils.java | 4 +++- src/main/java/org/tikv/common/PDClient.java | 2 ++ src/main/java/org/tikv/common/TiConfiguration.java | 11 +++++++++++ .../tikv/common/region/AbstractRegionStoreClient.java | 3 +++ src/main/java/org/tikv/raw/RawKVClient.java | 1 + 5 files changed, 20 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/tikv/common/ConfigUtils.java b/src/main/java/org/tikv/common/ConfigUtils.java index 0c2460e566e..1ffe00c5d4d 100644 --- a/src/main/java/org/tikv/common/ConfigUtils.java +++ b/src/main/java/org/tikv/common/ConfigUtils.java @@ -20,6 +20,7 @@ public class ConfigUtils { public static final String TIKV_PD_ADDRESSES = "tikv.pd.addresses"; public static final String TIKV_GRPC_TIMEOUT = "tikv.grpc.timeout_in_ms"; + public static final String TIKV_GRPC_FORWARD_TIMEOUT = "tikv.grpc.forward_timeout_in_ms"; public static final String TIKV_GRPC_SCAN_TIMEOUT = "tikv.grpc.scan_timeout_in_ms"; public static final String TIKV_GRPC_SCAN_BATCH_SIZE = "tikv.grpc.scan_batch_size"; public static final String TIKV_GRPC_MAX_FRAME_SIZE = "tikv.grpc.max_frame_size"; @@ -54,7 +55,8 @@ public class ConfigUtils { public static final String TIKV_ENABLE_ATOMIC_FOR_CAS = "tikv.enable_atomic_for_cas"; public static final String DEF_PD_ADDRESSES = "127.0.0.1:2379"; - public static final String DEF_TIMEOUT = "600ms"; + public static final String DEF_TIMEOUT = "150ms"; + public static final String DEF_FORWARD_TIMEOUT = "600ms"; public static final String DEF_SCAN_TIMEOUT = "20s"; public static final int DEF_CHECK_HEALTH_TIMEOUT = 40; public static final int DEF_SCAN_BATCH_SIZE = 10240; diff --git a/src/main/java/org/tikv/common/PDClient.java b/src/main/java/org/tikv/common/PDClient.java index f399a21ab08..963ad760413 100644 --- a/src/main/java/org/tikv/common/PDClient.java +++ b/src/main/java/org/tikv/common/PDClient.java @@ -361,6 +361,7 @@ private synchronized boolean createLeaderClientWrapper(String leaderUrlStr) { ManagedChannel clientChannel = channelFactory.getChannel(leaderUrlStr, hostMapping); pdClientWrapper = new PDClientWrapper(leaderUrlStr, leaderUrlStr, clientChannel, System.nanoTime()); + timeout = conf.getTimeout(); } catch (IllegalArgumentException e) { logger.error("Error updating leader. " + leaderUrlStr, e); return false; @@ -380,6 +381,7 @@ synchronized boolean createFollowerClientWrapper(String followerUrlStr, String l // create new Leader ManagedChannel channel = channelFactory.getChannel(followerUrlStr, hostMapping); pdClientWrapper = new PDClientWrapper(leaderUrls, followerUrlStr, channel, System.nanoTime()); + timeout = conf.getForwardTimeout(); } catch (IllegalArgumentException e) { logger.error("Error updating follower. " + followerUrlStr, e); return false; diff --git a/src/main/java/org/tikv/common/TiConfiguration.java b/src/main/java/org/tikv/common/TiConfiguration.java index 8344f2b0fe6..4ee8eac9c54 100644 --- a/src/main/java/org/tikv/common/TiConfiguration.java +++ b/src/main/java/org/tikv/common/TiConfiguration.java @@ -54,6 +54,7 @@ private static void loadFromSystemProperties() { private static void loadFromDefaultProperties() { setIfMissing(TIKV_PD_ADDRESSES, DEF_PD_ADDRESSES); setIfMissing(TIKV_GRPC_TIMEOUT, DEF_TIMEOUT); + setIfMissing(TIKV_GRPC_FORWARD_TIMEOUT, DEF_FORWARD_TIMEOUT); setIfMissing(TIKV_GRPC_SCAN_TIMEOUT, DEF_SCAN_TIMEOUT); setIfMissing(TIKV_GRPC_SCAN_BATCH_SIZE, DEF_SCAN_BATCH_SIZE); setIfMissing(TIKV_GRPC_MAX_FRAME_SIZE, DEF_MAX_FRAME_SIZE); @@ -237,6 +238,7 @@ private static ReplicaRead getReplicaRead(String key) { } private long timeout = getTimeAsMs(TIKV_GRPC_TIMEOUT); + private long forwardTimeout = getTimeAsMs(TIKV_GRPC_FORWARD_TIMEOUT); private long scanTimeout = getTimeAsMs(TIKV_GRPC_SCAN_TIMEOUT); private int maxFrameSize = getInt(TIKV_GRPC_MAX_FRAME_SIZE); private List pdAddrs = getPdAddrs(TIKV_PD_ADDRESSES); @@ -334,6 +336,15 @@ public TiConfiguration setTimeout(long timeout) { return this; } + public long getForwardTimeout() { + return forwardTimeout; + } + + public TiConfiguration setForwardTimeout(long timeout) { + this.forwardTimeout = timeout; + return this; + } + public long getScanTimeout() { return scanTimeout; } diff --git a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java index 7b008dac171..83b4a2e0785 100644 --- a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java @@ -60,6 +60,9 @@ protected AbstractRegionStoreClient( this.regionManager = regionManager; this.targetStore = store; this.originStore = null; + if (this.targetStore.getProxyStore() != null) { + this.timeout = conf.getForwardTimeout(); + } } public TiRegion getRegion() { diff --git a/src/main/java/org/tikv/raw/RawKVClient.java b/src/main/java/org/tikv/raw/RawKVClient.java index 87cac1361b9..79974b60c98 100644 --- a/src/main/java/org/tikv/raw/RawKVClient.java +++ b/src/main/java/org/tikv/raw/RawKVClient.java @@ -672,6 +672,7 @@ private void doSendBatchPut(BackOffer backOffer, Map kvP private List doSendBatchPutInBatchesWithRetry(BackOffer backOffer, Batch batch, long ttl) { try (RegionStoreClient client = clientBuilder.build(batch.getRegion())) { + client.setTimeout(conf.getScanTimeout()); client.rawBatchPut(backOffer, batch, ttl, atomicForCAS); return new ArrayList<>(); } catch (final TiKVException e) { From 86ad9973c46a6e2cd5caf6e97c6f80d08aac8de4 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Mon, 28 Jun 2021 20:38:16 +0800 Subject: [PATCH 03/10] add some log Signed-off-by: Little-Wallace --- src/main/java/org/tikv/common/PDClient.java | 2 ++ src/main/java/org/tikv/common/TiSession.java | 3 +++ .../common/region/AbstractRegionStoreClient.java | 9 +++++++++ .../java/org/tikv/common/region/RegionManager.java | 13 +++++++++++-- .../org/tikv/common/region/RegionStoreClient.java | 6 +++++- .../tikv/common/region/UnreachableStoreChecker.java | 6 ++++++ 6 files changed, 36 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/tikv/common/PDClient.java b/src/main/java/org/tikv/common/PDClient.java index 963ad760413..8bc2be66930 100644 --- a/src/main/java/org/tikv/common/PDClient.java +++ b/src/main/java/org/tikv/common/PDClient.java @@ -433,6 +433,8 @@ public synchronized void updateLeaderOrforwardFollower() { continue; } if (hasReachNextMember && createFollowerClientWrapper(followerUrlStr, leaderUrlStr)) { + logger.warn( + String.format("forward request to pd [%s] by pd [%s]", leaderUrlStr, followerUrlStr)); return; } } diff --git a/src/main/java/org/tikv/common/TiSession.java b/src/main/java/org/tikv/common/TiSession.java index f495c2bc44c..a7b6c543085 100644 --- a/src/main/java/org/tikv/common/TiSession.java +++ b/src/main/java/org/tikv/common/TiSession.java @@ -79,6 +79,9 @@ public TiSession(TiConfiguration conf) { this.client = PDClient.createRaw(conf, channelFactory); this.enableGrpcForward = conf.getEnableGrpcForward(); this.metricsServer = MetricsServer.getInstance(conf); + if (this.enableGrpcForward) { + logger.info("enable grpc forward for high available"); + } logger.info("TiSession initialized in " + conf.getKvMode() + " mode"); } diff --git a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java index 83b4a2e0785..9a3680b6b50 100644 --- a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java @@ -28,6 +28,8 @@ import io.grpc.stub.MetadataUtils; import java.util.List; import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.tikv.common.AbstractGRPCClient; import org.tikv.common.TiConfiguration; import org.tikv.common.exception.GrpcException; @@ -38,6 +40,7 @@ public abstract class AbstractRegionStoreClient extends AbstractGRPCClient implements RegionErrorReceiver { + private static final Logger logger = LoggerFactory.getLogger(AbstractRegionStoreClient.class); protected final RegionManager regionManager; protected TiRegion region; @@ -127,6 +130,12 @@ public boolean onStoreUnreachable() { if (originStore == null) { originStore = targetStore; } + logger.warn( + String.format( + "forward request to store [%s] by store [%s] for region[%d]", + targetStore.getStore().getAddress(), + targetStore.getProxyStore().getAddress(), + region.getId())); targetStore = proxyStore; String addressStr = targetStore.getProxyStore().getAddress(); ManagedChannel channel = diff --git a/src/main/java/org/tikv/common/region/RegionManager.java b/src/main/java/org/tikv/common/region/RegionManager.java index 062327135e0..e8ef821a640 100644 --- a/src/main/java/org/tikv/common/region/RegionManager.java +++ b/src/main/java/org/tikv/common/region/RegionManager.java @@ -217,7 +217,13 @@ public synchronized TiRegion updateLeader(TiRegion region, long storeId) { public synchronized void updateStore(TiStore oldStore, TiStore newStore) { if (cache.updateStore(oldStore, newStore)) { - this.storeChecker.scheduleStoreHealthCheck(newStore); + logger.warn( + String.format( + "check health for store [%s] in background thread", + newStore.getStore().getAddress())); + if (newStore.isUnreachable()) { + this.storeChecker.scheduleStoreHealthCheck(newStore); + } } } @@ -372,10 +378,13 @@ public synchronized boolean updateStore(TiStore oldStore, TiStore newStore) { TiStore originStore = storeCache.get(oldStore.getId()); if (originStore == oldStore) { storeCache.put(newStore.getId(), newStore); + if (oldStore != null && oldStore.isUnreachable()) { + oldStore.markReachable(); + } if (newStore.getProxyStore() != null) { newStore.markUnreachable(); - return true; } + return true; } return false; } diff --git a/src/main/java/org/tikv/common/region/RegionStoreClient.java b/src/main/java/org/tikv/common/region/RegionStoreClient.java index dc676827b33..5839c14e977 100644 --- a/src/main/java/org/tikv/common/region/RegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/RegionStoreClient.java @@ -1278,9 +1278,13 @@ public RegionStoreClient build(TiRegion region, TiStore store, TiStoreType store blockingStub = MetadataUtils.attachHeaders(TikvGrpc.newBlockingStub(channel), header); asyncStub = MetadataUtils.attachHeaders(TikvGrpc.newStub(channel), header); } else { - // If the store is reachable, which is update by check-health thread + // If the store is reachable, which is update by check-health thread, cancel proxy forward. if (!store.isUnreachable()) { if (store.getProxyStore() != null) { + logger.warn( + String.format( + "cancel request to store [%s] forward by store[%s]", + store.getStore().getAddress(), store.getProxyStore().getAddress())); TiStore newStore = store.withProxy(null); regionManager.updateStore(store, newStore); store = newStore; diff --git a/src/main/java/org/tikv/common/region/UnreachableStoreChecker.java b/src/main/java/org/tikv/common/region/UnreachableStoreChecker.java index c13d2ba0c13..11ea49b639d 100644 --- a/src/main/java/org/tikv/common/region/UnreachableStoreChecker.java +++ b/src/main/java/org/tikv/common/region/UnreachableStoreChecker.java @@ -9,12 +9,15 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.tikv.common.ReadOnlyPDClient; import org.tikv.common.util.ChannelFactory; import org.tikv.common.util.ConcreteBackOffer; import org.tikv.kvproto.Metapb; public class UnreachableStoreChecker implements Runnable { + private static final Logger logger = LoggerFactory.getLogger(UnreachableStoreChecker.class); private ConcurrentHashMap stores; private BlockingQueue taskQueue; private final ChannelFactory channelFactory; @@ -67,6 +70,9 @@ public void run() { HealthCheckResponse resp = stub.check(req); if (resp.getStatus() == HealthCheckResponse.ServingStatus.SERVING) { store.markReachable(); + logger.warn( + String.format("store [%s] recovers to be reachable", store.getStore().getAddress())); + this.stores.remove(Long.valueOf(store.getId())); continue; } From 43971dae78b4fc05f5d3529a5b6e5045656f3577 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Wed, 30 Jun 2021 16:01:33 +0800 Subject: [PATCH 04/10] fix bug Signed-off-by: Little-Wallace --- .../java/org/tikv/common/region/AbstractRegionStoreClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java index 9a3680b6b50..c878c5797e2 100644 --- a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java @@ -130,13 +130,13 @@ public boolean onStoreUnreachable() { if (originStore == null) { originStore = targetStore; } + targetStore = proxyStore; logger.warn( String.format( "forward request to store [%s] by store [%s] for region[%d]", targetStore.getStore().getAddress(), targetStore.getProxyStore().getAddress(), region.getId())); - targetStore = proxyStore; String addressStr = targetStore.getProxyStore().getAddress(); ManagedChannel channel = channelFactory.getChannel(addressStr, regionManager.getPDClient().getHostMapping()); From f705a6c72837d226fb8be478f29783dc046a0c8e Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Wed, 30 Jun 2021 16:21:42 +0800 Subject: [PATCH 05/10] fix some log Signed-off-by: Little-Wallace --- src/main/java/org/tikv/common/PDClient.java | 2 +- src/main/java/org/tikv/common/region/RegionManager.java | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/main/java/org/tikv/common/PDClient.java b/src/main/java/org/tikv/common/PDClient.java index 8bc2be66930..f14e4def564 100644 --- a/src/main/java/org/tikv/common/PDClient.java +++ b/src/main/java/org/tikv/common/PDClient.java @@ -335,7 +335,7 @@ private GetMembersResponse getMembers(URI uri) { } return resp; } catch (Exception e) { - logger.warn("failed to get member from pd server.", e); + logger.debug("failed to get member from pd server.", e); } return null; } diff --git a/src/main/java/org/tikv/common/region/RegionManager.java b/src/main/java/org/tikv/common/region/RegionManager.java index e8ef821a640..5feddf9d47f 100644 --- a/src/main/java/org/tikv/common/region/RegionManager.java +++ b/src/main/java/org/tikv/common/region/RegionManager.java @@ -217,11 +217,11 @@ public synchronized TiRegion updateLeader(TiRegion region, long storeId) { public synchronized void updateStore(TiStore oldStore, TiStore newStore) { if (cache.updateStore(oldStore, newStore)) { - logger.warn( - String.format( - "check health for store [%s] in background thread", - newStore.getStore().getAddress())); if (newStore.isUnreachable()) { + logger.warn( + String.format( + "check health for store [%s] in background thread", + newStore.getStore().getAddress())); this.storeChecker.scheduleStoreHealthCheck(newStore); } } From 7cc75aa2faab73b68f7ca9a72788c9d912dae354 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Wed, 30 Jun 2021 17:10:41 +0800 Subject: [PATCH 06/10] do not try switch too many times Signed-off-by: Little-Wallace --- src/main/java/org/tikv/common/PDClient.java | 9 ++++++--- .../common/region/AbstractRegionStoreClient.java | 12 ++++++++++++ .../java/org/tikv/common/region/RegionManager.java | 13 +++++++------ 3 files changed, 25 insertions(+), 9 deletions(-) diff --git a/src/main/java/org/tikv/common/PDClient.java b/src/main/java/org/tikv/common/PDClient.java index f14e4def564..1369a887feb 100644 --- a/src/main/java/org/tikv/common/PDClient.java +++ b/src/main/java/org/tikv/common/PDClient.java @@ -342,6 +342,7 @@ private GetMembersResponse getMembers(URI uri) { // return whether the leader has changed to target address `leaderUrlStr`. synchronized boolean trySwitchLeader(String leaderUrlStr) { + if (pdClientWrapper != null) { if (leaderUrlStr.equals(pdClientWrapper.getLeaderInfo())) { // The message to leader is not forwarded by follower. @@ -468,8 +469,10 @@ public void tryUpdateLeader() { return; } } - throw new TiClientInternalException( - "already tried all address on file, but not leader found yet."); + if (pdClientWrapper == null) { + throw new TiClientInternalException( + "already tried all address on file, but not leader found yet."); + } } private synchronized void tryUpdateMembers(List members) { @@ -661,7 +664,7 @@ long getCreateTime() { @Override public String toString() { - return "[leaderInfo: " + leaderInfo + "]"; + return "[leaderInfo: " + leaderInfo + ", storeAddress: " + storeAddress + "]"; } } diff --git a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java index c878c5797e2..7bc4d1f3dc4 100644 --- a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java @@ -46,6 +46,7 @@ public abstract class AbstractRegionStoreClient protected TiRegion region; protected TiStore targetStore; protected TiStore originStore; + protected long retryTimes; protected AbstractRegionStoreClient( TiConfiguration conf, @@ -63,6 +64,7 @@ protected AbstractRegionStoreClient( this.regionManager = regionManager; this.targetStore = store; this.originStore = null; + this.retryTimes = 0; if (this.targetStore.getProxyStore() != null) { this.timeout = conf.getForwardTimeout(); } @@ -122,6 +124,12 @@ public boolean onStoreUnreachable() { return true; } } + } else if (retryTimes > region.getFollowerList().size()) { + logger.warn( + String.format( + "retry time exceed for region[%d], invalid this region and store[%d]", + region.getId(), targetStore.getId())); + return false; } TiStore proxyStore = switchProxyStore(); if (proxyStore == null) { @@ -129,8 +137,12 @@ public boolean onStoreUnreachable() { } if (originStore == null) { originStore = targetStore; + if (this.targetStore.getProxyStore() != null) { + this.timeout = conf.getForwardTimeout(); + } } targetStore = proxyStore; + retryTimes += 1; logger.warn( String.format( "forward request to store [%s] by store [%s] for region[%d]", diff --git a/src/main/java/org/tikv/common/region/RegionManager.java b/src/main/java/org/tikv/common/region/RegionManager.java index 5feddf9d47f..2a05b7f5bc2 100644 --- a/src/main/java/org/tikv/common/region/RegionManager.java +++ b/src/main/java/org/tikv/common/region/RegionManager.java @@ -219,9 +219,9 @@ public synchronized void updateStore(TiStore oldStore, TiStore newStore) { if (cache.updateStore(oldStore, newStore)) { if (newStore.isUnreachable()) { logger.warn( - String.format( - "check health for store [%s] in background thread", - newStore.getStore().getAddress())); + String.format( + "check health for store [%s] in background thread", + newStore.getStore().getAddress())); this.storeChecker.scheduleStoreHealthCheck(newStore); } } @@ -237,13 +237,14 @@ public void clearRegionCache() { * * @param region region */ - public void onRequestFail(TiRegion region) { + public synchronized void onRequestFail(TiRegion region) { onRequestFail(region, region.getLeader().getStoreId()); } private void onRequestFail(TiRegion region, long storeId) { - if (this.storeChecker != null) { - cache.invalidateRegion(region); + logger.warn(String.format("invalid store [%d]", storeId)); + cache.invalidateRegion(region); + if (cache.storeCache.get(storeId) != null) { cache.invalidateAllRegionForStore(storeId); } } From 7b7afa95c45e841e457415822d9b7e91dd35daae Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Wed, 30 Jun 2021 18:18:38 +0800 Subject: [PATCH 07/10] update retry strategy Signed-off-by: Little-Wallace --- src/main/java/org/tikv/common/PDClient.java | 2 +- .../common/operation/RegionErrorHandler.java | 2 -- .../region/AbstractRegionStoreClient.java | 10 +++++++++- .../org/tikv/common/region/RegionManager.java | 18 ++++++++++-------- 4 files changed, 20 insertions(+), 12 deletions(-) diff --git a/src/main/java/org/tikv/common/PDClient.java b/src/main/java/org/tikv/common/PDClient.java index 1369a887feb..a65a49d4681 100644 --- a/src/main/java/org/tikv/common/PDClient.java +++ b/src/main/java/org/tikv/common/PDClient.java @@ -342,7 +342,6 @@ private GetMembersResponse getMembers(URI uri) { // return whether the leader has changed to target address `leaderUrlStr`. synchronized boolean trySwitchLeader(String leaderUrlStr) { - if (pdClientWrapper != null) { if (leaderUrlStr.equals(pdClientWrapper.getLeaderInfo())) { // The message to leader is not forwarded by follower. @@ -414,6 +413,7 @@ public synchronized void updateLeaderOrforwardFollower() { continue; } + logger.info(String.format("can not switch to new leader, try follower forward")); List members = resp.getMembersList(); boolean hasReachNextMember = false; diff --git a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java index 10744ee8d28..32b0a3cf2e1 100644 --- a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java +++ b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java @@ -169,8 +169,6 @@ public boolean handleRegionError(BackOffer backOffer, Errorpb.Error error) { public boolean handleRequestError(BackOffer backOffer, Exception e) { if (recv.onStoreUnreachable()) { return true; - } else { - regionManager.onRequestFail(recv.getRegion()); } backOffer.doBackOff( diff --git a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java index 7bc4d1f3dc4..90eed038630 100644 --- a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java @@ -116,6 +116,7 @@ public boolean onNotLeader(TiRegion newRegion) { @Override public boolean onStoreUnreachable() { if (!conf.getEnableGrpcForward()) { + regionManager.onRequestFail(region, targetStore); return false; } if (targetStore.getProxyStore() == null) { @@ -129,10 +130,17 @@ public boolean onStoreUnreachable() { String.format( "retry time exceed for region[%d], invalid this region and store[%d]", region.getId(), targetStore.getId())); + if (originStore != null) { + regionManager.onRequestFail(region, originStore); + } return false; } TiStore proxyStore = switchProxyStore(); if (proxyStore == null) { + logger.warn( + String.format( + "no forward store can be selected for store [%s] and region[%d]", + targetStore.getStore().getAddress(), region.getId())); return false; } if (originStore == null) { @@ -198,7 +206,7 @@ private TiStore switchProxyStore() { return targetStore.withProxy(store.getStore()); } } else { - if (peer.getStoreId() == targetStore.getStore().getId()) { + if (peer.getStoreId() == targetStore.getProxyStore().getId()) { hasVisitedStore = true; } else if (hasVisitedStore) { TiStore proxyStore = regionManager.getStoreById(peer.getStoreId()); diff --git a/src/main/java/org/tikv/common/region/RegionManager.java b/src/main/java/org/tikv/common/region/RegionManager.java index 2a05b7f5bc2..cb763b8a484 100644 --- a/src/main/java/org/tikv/common/region/RegionManager.java +++ b/src/main/java/org/tikv/common/region/RegionManager.java @@ -238,15 +238,12 @@ public void clearRegionCache() { * @param region region */ public synchronized void onRequestFail(TiRegion region) { - onRequestFail(region, region.getLeader().getStoreId()); + cache.invalidateRegion(region); } - private void onRequestFail(TiRegion region, long storeId) { - logger.warn(String.format("invalid store [%d]", storeId)); + public synchronized void onRequestFail(TiRegion region, TiStore store) { cache.invalidateRegion(region); - if (cache.storeCache.get(storeId) != null) { - cache.invalidateAllRegionForStore(storeId); - } + cache.invalidateAllRegionForStore(store); } public void invalidateStore(long storeId) { @@ -390,10 +387,14 @@ public synchronized boolean updateStore(TiStore oldStore, TiStore newStore) { return false; } - public synchronized void invalidateAllRegionForStore(long storeId) { + public synchronized void invalidateAllRegionForStore(TiStore store) { + TiStore oldStore = storeCache.get(store.getId()); + if (oldStore != store) { + return; + } List regionToRemove = new ArrayList<>(); for (TiRegion r : regionCache.values()) { - if (r.getLeader().getStoreId() == storeId) { + if (r.getLeader().getStoreId() == store.getId()) { if (logger.isDebugEnabled()) { logger.debug(String.format("invalidateAllRegionForStore Region[%s]", r)); } @@ -401,6 +402,7 @@ public synchronized void invalidateAllRegionForStore(long storeId) { } } + logger.warn(String.format("invalid store [%d]", store.getId())); // remove region for (TiRegion r : regionToRemove) { keyToRegionIdCache.remove(makeRange(r.getStartKey(), r.getEndKey())); From 5d7634d1b97f2f726233f0b070ec74591d707e55 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Wed, 30 Jun 2021 19:59:21 +0800 Subject: [PATCH 08/10] set timeout for getmemberlist Signed-off-by: Little-Wallace --- src/main/java/org/tikv/common/PDClient.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/tikv/common/PDClient.java b/src/main/java/org/tikv/common/PDClient.java index a65a49d4681..d093edb46dd 100644 --- a/src/main/java/org/tikv/common/PDClient.java +++ b/src/main/java/org/tikv/common/PDClient.java @@ -325,7 +325,8 @@ PDClientWrapper getPdClientWrapper() { private GetMembersResponse getMembers(URI uri) { try { ManagedChannel probChan = channelFactory.getChannel(uriToAddr(uri), hostMapping); - PDGrpc.PDBlockingStub stub = PDGrpc.newBlockingStub(probChan); + PDGrpc.PDBlockingStub stub = + PDGrpc.newBlockingStub(probChan).withDeadlineAfter(getTimeout(), TimeUnit.MILLISECONDS); GetMembersRequest request = GetMembersRequest.newBuilder().setHeader(RequestHeader.getDefaultInstance()).build(); GetMembersResponse resp = stub.getMembers(request); @@ -335,7 +336,7 @@ private GetMembersResponse getMembers(URI uri) { } return resp; } catch (Exception e) { - logger.debug("failed to get member from pd server.", e); + logger.warn("failed to get member from pd server.", e); } return null; } @@ -568,6 +569,9 @@ private void initCluster() { this.hostMapping = Optional.ofNullable(getConf().getHostMapping()) .orElseGet(() -> new DefaultHostMapping(this.etcdClient, conf.getNetworkMappingName())); + // The first request may cost too much latency + long originTimeout = this.timeout; + this.timeout = 2000; for (URI u : pdAddrs) { resp = getMembers(u); if (resp != null) { @@ -575,6 +579,7 @@ private void initCluster() { } logger.info("Could not get leader member with pd: " + u); } + this.timeout = originTimeout; checkNotNull(resp, "Failed to init client for PD cluster."); long clusterId = resp.getHeader().getClusterId(); header = RequestHeader.newBuilder().setClusterId(clusterId).build(); From a722b19141072e537fa54508b7bfba1682496688 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Wed, 30 Jun 2021 20:14:29 +0800 Subject: [PATCH 09/10] do not invalid store Signed-off-by: Little-Wallace --- src/main/java/org/tikv/common/PDClient.java | 2 +- .../org/tikv/common/region/AbstractRegionStoreClient.java | 6 ++---- src/main/java/org/tikv/common/region/RegionManager.java | 5 ----- 3 files changed, 3 insertions(+), 10 deletions(-) diff --git a/src/main/java/org/tikv/common/PDClient.java b/src/main/java/org/tikv/common/PDClient.java index d093edb46dd..9de9d857b35 100644 --- a/src/main/java/org/tikv/common/PDClient.java +++ b/src/main/java/org/tikv/common/PDClient.java @@ -336,7 +336,7 @@ private GetMembersResponse getMembers(URI uri) { } return resp; } catch (Exception e) { - logger.warn("failed to get member from pd server.", e); + logger.debug("failed to get member from pd server.", e); } return null; } diff --git a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java index 90eed038630..4748a51070e 100644 --- a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java @@ -116,7 +116,7 @@ public boolean onNotLeader(TiRegion newRegion) { @Override public boolean onStoreUnreachable() { if (!conf.getEnableGrpcForward()) { - regionManager.onRequestFail(region, targetStore); + regionManager.onRequestFail(region); return false; } if (targetStore.getProxyStore() == null) { @@ -130,9 +130,7 @@ public boolean onStoreUnreachable() { String.format( "retry time exceed for region[%d], invalid this region and store[%d]", region.getId(), targetStore.getId())); - if (originStore != null) { - regionManager.onRequestFail(region, originStore); - } + regionManager.onRequestFail(region); return false; } TiStore proxyStore = switchProxyStore(); diff --git a/src/main/java/org/tikv/common/region/RegionManager.java b/src/main/java/org/tikv/common/region/RegionManager.java index cb763b8a484..662b8460e59 100644 --- a/src/main/java/org/tikv/common/region/RegionManager.java +++ b/src/main/java/org/tikv/common/region/RegionManager.java @@ -241,11 +241,6 @@ public synchronized void onRequestFail(TiRegion region) { cache.invalidateRegion(region); } - public synchronized void onRequestFail(TiRegion region, TiStore store) { - cache.invalidateRegion(region); - cache.invalidateAllRegionForStore(store); - } - public void invalidateStore(long storeId) { cache.invalidateStore(storeId); } From 5c46b48de465b1241590d36e8e4e3853315ae98f Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Wed, 30 Jun 2021 20:17:13 +0800 Subject: [PATCH 10/10] fix store reachable Signed-off-by: Little-Wallace --- src/main/java/org/tikv/common/region/TiStore.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/tikv/common/region/TiStore.java b/src/main/java/org/tikv/common/region/TiStore.java index 17ef70935d0..f6c7fc80ebb 100644 --- a/src/main/java/org/tikv/common/region/TiStore.java +++ b/src/main/java/org/tikv/common/region/TiStore.java @@ -14,14 +14,14 @@ public TiStore(Metapb.Store store) { this.proxyStore = null; } - private TiStore(Metapb.Store store, Metapb.Store proxyStore, boolean unreachable) { + private TiStore(Metapb.Store store, Metapb.Store proxyStore) { this.store = store; - this.unreachable = new AtomicBoolean(unreachable); + this.unreachable = new AtomicBoolean(false); this.proxyStore = proxyStore; } public TiStore withProxy(Metapb.Store proxyStore) { - return new TiStore(this.store, proxyStore, this.unreachable.get()); + return new TiStore(this.store, proxyStore); } public void markUnreachable() {