From dcaea8c1c14cd374bfd64e40c58fb03726d5cad7 Mon Sep 17 00:00:00 2001 From: Liangliang Gu Date: Fri, 24 Dec 2021 15:05:35 +0800 Subject: [PATCH] cherry pick #434 to release-3.1 Signed-off-by: ti-srebot Signed-off-by: marsishandsome --- src/main/java/org/tikv/common/TiSession.java | 14 ++++++++------ .../common/operation/RegionErrorHandler.java | 2 +- .../region/AbstractRegionStoreClient.java | 18 +++++++++--------- .../common/region/RegionErrorReceiver.java | 2 +- .../tikv/common/region/RegionStoreClient.java | 2 +- .../tikv/common/util/ConcreteBackOffer.java | 4 ++-- 6 files changed, 22 insertions(+), 20 deletions(-) diff --git a/src/main/java/org/tikv/common/TiSession.java b/src/main/java/org/tikv/common/TiSession.java index f46479df7ec..3fab40ccc56 100644 --- a/src/main/java/org/tikv/common/TiSession.java +++ b/src/main/java/org/tikv/common/TiSession.java @@ -92,21 +92,22 @@ public TiSession(TiConfiguration conf) { } private synchronized void warmUp() { - long warmUpStartTime = System.currentTimeMillis(); + long warmUpStartTime = System.nanoTime(); + BackOffer backOffer = ConcreteBackOffer.newRawKVBackOff(); + try { this.client = getPDClient(); this.regionManager = getRegionManager(); - List stores = this.client.getAllStores(ConcreteBackOffer.newGetBackOff()); + List stores = this.client.getAllStores(backOffer); // warm up store cache for (Metapb.Store store : stores) { this.regionManager.updateStore( - null, - new TiStore(this.client.getStore(ConcreteBackOffer.newGetBackOff(), store.getId()))); + null, new TiStore(this.client.getStore(backOffer, store.getId()))); } ByteString startKey = ByteString.EMPTY; do { - TiRegion region = regionManager.getRegionByKey(startKey); + TiRegion region = regionManager.getRegionByKey(startKey, backOffer); startKey = region.getEndKey(); } while (!startKey.isEmpty()); @@ -121,7 +122,8 @@ private synchronized void warmUp() { logger.info("warm up fails, ignored ", e); } finally { logger.info( - String.format("warm up duration %d ms", System.currentTimeMillis() - warmUpStartTime)); + String.format( + "warm up duration %d ms", (System.nanoTime() - warmUpStartTime) / 1_000_000)); } } diff --git a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java index 74a78cd6b0c..42e2818b198 100644 --- a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java +++ b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java @@ -76,7 +76,7 @@ public boolean handleRegionError(BackOffer backOffer, Errorpb.Error error) { // onNotLeader is only needed when updateLeader succeeds, thus switch // to a new store address. TiRegion newRegion = this.regionManager.updateLeader(recv.getRegion(), newStoreId); - retry = newRegion != null && recv.onNotLeader(newRegion); + retry = newRegion != null && recv.onNotLeader(newRegion, backOffer); backOffFuncType = BackOffFunction.BackOffFuncType.BoUpdateLeader; } else { diff --git a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java index cfa2eb27e70..ddbfb37b9bc 100644 --- a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java @@ -108,7 +108,7 @@ public void close() throws GrpcException {} * @return false when re-split is needed. */ @Override - public boolean onNotLeader(TiRegion newRegion) { + public boolean onNotLeader(TiRegion newRegion, BackOffer backOffer) { if (logger.isDebugEnabled()) { logger.debug(region + ", new leader = " + newRegion.getLeader().getStoreId()); } @@ -123,7 +123,7 @@ public boolean onNotLeader(TiRegion newRegion) { store = null; } region = newRegion; - store = regionManager.getStoreById(region.getLeader().getStoreId()); + store = regionManager.getStoreById(region.getLeader().getStoreId(), backOffer); updateClientStub(); return true; } @@ -193,10 +193,10 @@ private Boolean seekLeaderStore(BackOffer backOffer) { logger.info(String.format("try switch leader: region[%d]", region.getId())); - Metapb.Peer peer = switchLeaderStore(); + Metapb.Peer peer = switchLeaderStore(backOffer); if (peer != null) { // we found a leader - TiStore currentLeaderStore = regionManager.getStoreById(peer.getStoreId()); + TiStore currentLeaderStore = regionManager.getStoreById(peer.getStoreId(), backOffer); if (currentLeaderStore.isReachable()) { logger.info( String.format( @@ -232,7 +232,7 @@ private boolean seekProxyStore(BackOffer backOffer) { try { logger.info(String.format("try grpc forward: region[%d]", region.getId())); // when current leader cannot be reached - TiStore storeWithProxy = switchProxyStore(); + TiStore storeWithProxy = switchProxyStore(backOffer); if (storeWithProxy == null) { // no store available, retry logger.warn(String.format("No store available, retry: region[%d]", region.getId())); @@ -250,11 +250,11 @@ private boolean seekProxyStore(BackOffer backOffer) { } // first: leader peer, second: true if any responses returned with grpc error - private Metapb.Peer switchLeaderStore() { + private Metapb.Peer switchLeaderStore(BackOffer backOffer) { List responses = new LinkedList<>(); for (Metapb.Peer peer : region.getFollowerList()) { ByteString key = region.getStartKey(); - TiStore peerStore = regionManager.getStoreById(peer.getStoreId()); + TiStore peerStore = regionManager.getStoreById(peer.getStoreId(), backOffer); ManagedChannel channel = channelFactory.getChannel( peerStore.getAddress(), regionManager.getPDClient().getHostMapping()); @@ -300,12 +300,12 @@ private Metapb.Peer switchLeaderStore() { } } - private TiStore switchProxyStore() { + private TiStore switchProxyStore(BackOffer backOffer) { long forwardTimeout = conf.getForwardTimeout(); List responses = new LinkedList<>(); for (Metapb.Peer peer : region.getFollowerList()) { ByteString key = region.getStartKey(); - TiStore peerStore = regionManager.getStoreById(peer.getStoreId()); + TiStore peerStore = regionManager.getStoreById(peer.getStoreId(), backOffer); ManagedChannel channel = channelFactory.getChannel( peerStore.getAddress(), regionManager.getPDClient().getHostMapping()); diff --git a/src/main/java/org/tikv/common/region/RegionErrorReceiver.java b/src/main/java/org/tikv/common/region/RegionErrorReceiver.java index e0a3dce930c..667ed69a092 100644 --- a/src/main/java/org/tikv/common/region/RegionErrorReceiver.java +++ b/src/main/java/org/tikv/common/region/RegionErrorReceiver.java @@ -20,7 +20,7 @@ import org.tikv.common.util.BackOffer; public interface RegionErrorReceiver { - boolean onNotLeader(TiRegion region); + boolean onNotLeader(TiRegion region, BackOffer backOffer); /// return whether we need to retry this request. boolean onStoreUnreachable(BackOffer backOffer); diff --git a/src/main/java/org/tikv/common/region/RegionStoreClient.java b/src/main/java/org/tikv/common/region/RegionStoreClient.java index ab4d06e9874..992ff5c101e 100644 --- a/src/main/java/org/tikv/common/region/RegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/RegionStoreClient.java @@ -274,7 +274,7 @@ public List scan( boolean forWrite = false; while (true) { // we should refresh region - region = regionManager.getRegionByKey(startKey); + region = regionManager.getRegionByKey(startKey, backOffer); Supplier request = () -> diff --git a/src/main/java/org/tikv/common/util/ConcreteBackOffer.java b/src/main/java/org/tikv/common/util/ConcreteBackOffer.java index 355c9ff6023..50abeae00fc 100644 --- a/src/main/java/org/tikv/common/util/ConcreteBackOffer.java +++ b/src/main/java/org/tikv/common/util/ConcreteBackOffer.java @@ -168,8 +168,6 @@ public boolean canRetryAfterSleep(BackOffFunction.BackOffFuncType funcType) { } public boolean canRetryAfterSleep(BackOffFunction.BackOffFuncType funcType, long maxSleepMs) { - SlowLogSpan slowLogSpan = getSlowLog().start("backoff " + funcType.name()); - Histogram.Timer backOffTimer = BACKOFF_DURATION.labels(funcType.name()).startTimer(); BackOffFunction backOffFunction = backOffFunctionMap.computeIfAbsent(funcType, this::createBackOffFunc); @@ -185,6 +183,8 @@ public boolean canRetryAfterSleep(BackOffFunction.BackOffFuncType funcType, long } } + Histogram.Timer backOffTimer = BACKOFF_DURATION.labels(funcType.name()).startTimer(); + SlowLogSpan slowLogSpan = getSlowLog().start("backoff " + funcType.name()); try { Thread.sleep(sleep); } catch (InterruptedException e) {