From 2fc83bada5d4772107535dc0855c4d433fda9229 Mon Sep 17 00:00:00 2001 From: marsishandsome Date: Thu, 23 Dec 2021 16:03:26 +0800 Subject: [PATCH 1/2] fix seekLeaderStore backoffer Signed-off-by: marsishandsome --- .../common/operation/RegionErrorHandler.java | 2 +- .../region/AbstractRegionStoreClient.java | 18 +++++++++--------- .../common/region/RegionErrorReceiver.java | 2 +- .../org/tikv/common/region/RegionManager.java | 17 +++++++++++------ 4 files changed, 22 insertions(+), 17 deletions(-) diff --git a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java index 74a78cd6b0c..42e2818b198 100644 --- a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java +++ b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java @@ -76,7 +76,7 @@ public boolean handleRegionError(BackOffer backOffer, Errorpb.Error error) { // onNotLeader is only needed when updateLeader succeeds, thus switch // to a new store address. TiRegion newRegion = this.regionManager.updateLeader(recv.getRegion(), newStoreId); - retry = newRegion != null && recv.onNotLeader(newRegion); + retry = newRegion != null && recv.onNotLeader(newRegion, backOffer); backOffFuncType = BackOffFunction.BackOffFuncType.BoUpdateLeader; } else { diff --git a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java index cfa2eb27e70..ddbfb37b9bc 100644 --- a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java @@ -108,7 +108,7 @@ public void close() throws GrpcException {} * @return false when re-split is needed. */ @Override - public boolean onNotLeader(TiRegion newRegion) { + public boolean onNotLeader(TiRegion newRegion, BackOffer backOffer) { if (logger.isDebugEnabled()) { logger.debug(region + ", new leader = " + newRegion.getLeader().getStoreId()); } @@ -123,7 +123,7 @@ public boolean onNotLeader(TiRegion newRegion) { store = null; } region = newRegion; - store = regionManager.getStoreById(region.getLeader().getStoreId()); + store = regionManager.getStoreById(region.getLeader().getStoreId(), backOffer); updateClientStub(); return true; } @@ -193,10 +193,10 @@ private Boolean seekLeaderStore(BackOffer backOffer) { logger.info(String.format("try switch leader: region[%d]", region.getId())); - Metapb.Peer peer = switchLeaderStore(); + Metapb.Peer peer = switchLeaderStore(backOffer); if (peer != null) { // we found a leader - TiStore currentLeaderStore = regionManager.getStoreById(peer.getStoreId()); + TiStore currentLeaderStore = regionManager.getStoreById(peer.getStoreId(), backOffer); if (currentLeaderStore.isReachable()) { logger.info( String.format( @@ -232,7 +232,7 @@ private boolean seekProxyStore(BackOffer backOffer) { try { logger.info(String.format("try grpc forward: region[%d]", region.getId())); // when current leader cannot be reached - TiStore storeWithProxy = switchProxyStore(); + TiStore storeWithProxy = switchProxyStore(backOffer); if (storeWithProxy == null) { // no store available, retry logger.warn(String.format("No store available, retry: region[%d]", region.getId())); @@ -250,11 +250,11 @@ private boolean seekProxyStore(BackOffer backOffer) { } // first: leader peer, second: true if any responses returned with grpc error - private Metapb.Peer switchLeaderStore() { + private Metapb.Peer switchLeaderStore(BackOffer backOffer) { List responses = new LinkedList<>(); for (Metapb.Peer peer : region.getFollowerList()) { ByteString key = region.getStartKey(); - TiStore peerStore = regionManager.getStoreById(peer.getStoreId()); + TiStore peerStore = regionManager.getStoreById(peer.getStoreId(), backOffer); ManagedChannel channel = channelFactory.getChannel( peerStore.getAddress(), regionManager.getPDClient().getHostMapping()); @@ -300,12 +300,12 @@ private Metapb.Peer switchLeaderStore() { } } - private TiStore switchProxyStore() { + private TiStore switchProxyStore(BackOffer backOffer) { long forwardTimeout = conf.getForwardTimeout(); List responses = new LinkedList<>(); for (Metapb.Peer peer : region.getFollowerList()) { ByteString key = region.getStartKey(); - TiStore peerStore = regionManager.getStoreById(peer.getStoreId()); + TiStore peerStore = regionManager.getStoreById(peer.getStoreId(), backOffer); ManagedChannel channel = channelFactory.getChannel( peerStore.getAddress(), regionManager.getPDClient().getHostMapping()); diff --git a/src/main/java/org/tikv/common/region/RegionErrorReceiver.java b/src/main/java/org/tikv/common/region/RegionErrorReceiver.java index e0a3dce930c..667ed69a092 100644 --- a/src/main/java/org/tikv/common/region/RegionErrorReceiver.java +++ b/src/main/java/org/tikv/common/region/RegionErrorReceiver.java @@ -20,7 +20,7 @@ import org.tikv.common.util.BackOffer; public interface RegionErrorReceiver { - boolean onNotLeader(TiRegion region); + boolean onNotLeader(TiRegion region, BackOffer backOffer); /// return whether we need to retry this request. boolean onStoreUnreachable(BackOffer backOffer); diff --git a/src/main/java/org/tikv/common/region/RegionManager.java b/src/main/java/org/tikv/common/region/RegionManager.java index 24b9acabe70..267eacb1ddb 100644 --- a/src/main/java/org/tikv/common/region/RegionManager.java +++ b/src/main/java/org/tikv/common/region/RegionManager.java @@ -228,13 +228,18 @@ public TiStore getStoreById(long id) { } public TiStore getStoreById(long id, BackOffer backOffer) { - TiStore store = getStoreByIdWithBackOff(id, backOffer); - if (store == null) { - logger.warn(String.format("failed to fetch store %d, the store may be missing", id)); - cache.clearAll(); - throw new InvalidStoreException(id); + SlowLogSpan span = backOffer.getSlowLog().start("getStoreById"); + try { + TiStore store = getStoreByIdWithBackOff(id, backOffer); + if (store == null) { + logger.warn(String.format("failed to fetch store %d, the store may be missing", id)); + cache.clearAll(); + throw new InvalidStoreException(id); + } + return store; + } finally { + span.end(); } - return store; } public void onRegionStale(TiRegion region) { From 919f5c8d2b81e0ef0b6a52b551d6bf7267e44237 Mon Sep 17 00:00:00 2001 From: marsishandsome Date: Thu, 23 Dec 2021 16:28:33 +0800 Subject: [PATCH 2/2] continue fix Signed-off-by: marsishandsome --- src/main/java/org/tikv/common/TiSession.java | 38 ++++++++++++++----- .../tikv/common/region/RegionStoreClient.java | 2 +- 2 files changed, 30 insertions(+), 10 deletions(-) diff --git a/src/main/java/org/tikv/common/TiSession.java b/src/main/java/org/tikv/common/TiSession.java index f46479df7ec..260d9f1a99c 100644 --- a/src/main/java/org/tikv/common/TiSession.java +++ b/src/main/java/org/tikv/common/TiSession.java @@ -93,23 +93,46 @@ public TiSession(TiConfiguration conf) { private synchronized void warmUp() { long warmUpStartTime = System.currentTimeMillis(); + warmUpStore(); + warmUpRegion(); + warmUpGrpc(); + logger.info( + String.format("warm up duration %d ms", System.currentTimeMillis() - warmUpStartTime)); + } + + private void warmUpStore() { try { + BackOffer backOffer = ConcreteBackOffer.newRawKVBackOff(); this.client = getPDClient(); this.regionManager = getRegionManager(); - List stores = this.client.getAllStores(ConcreteBackOffer.newGetBackOff()); + List stores = this.client.getAllStores(backOffer); // warm up store cache for (Metapb.Store store : stores) { this.regionManager.updateStore( - null, - new TiStore(this.client.getStore(ConcreteBackOffer.newGetBackOff(), store.getId()))); + null, new TiStore(this.client.getStore(backOffer, store.getId()))); } - ByteString startKey = ByteString.EMPTY; + } catch (Exception e) { + // ignore error + logger.info("warm up store fails, ignored ", e); + } + } + private void warmUpRegion() { + try { + BackOffer backOffer = ConcreteBackOffer.newRawKVBackOff(); + ByteString startKey = ByteString.EMPTY; do { - TiRegion region = regionManager.getRegionByKey(startKey); + TiRegion region = regionManager.getRegionByKey(startKey, backOffer); startKey = region.getEndKey(); } while (!startKey.isEmpty()); + } catch (Exception e) { + // ignore error + logger.info("warm up region fails, ignored ", e); + } + } + private void warmUpGrpc() { + try { RawKVClient rawKVClient = createRawClient(); ByteString exampleKey = ByteString.EMPTY; ByteString prev = rawKVClient.get(exampleKey); @@ -118,10 +141,7 @@ private synchronized void warmUp() { rawKVClient.put(exampleKey, prev); } catch (Exception e) { // ignore error - logger.info("warm up fails, ignored ", e); - } finally { - logger.info( - String.format("warm up duration %d ms", System.currentTimeMillis() - warmUpStartTime)); + logger.info("warm up grpc fails, ignored ", e); } } diff --git a/src/main/java/org/tikv/common/region/RegionStoreClient.java b/src/main/java/org/tikv/common/region/RegionStoreClient.java index ab4d06e9874..992ff5c101e 100644 --- a/src/main/java/org/tikv/common/region/RegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/RegionStoreClient.java @@ -274,7 +274,7 @@ public List scan( boolean forWrite = false; while (true) { // we should refresh region - region = regionManager.getRegionByKey(startKey); + region = regionManager.getRegionByKey(startKey, backOffer); Supplier request = () ->