diff --git a/src/main/java/org/tikv/common/TiSession.java b/src/main/java/org/tikv/common/TiSession.java index ac84daec017..aec955e414a 100644 --- a/src/main/java/org/tikv/common/TiSession.java +++ b/src/main/java/org/tikv/common/TiSession.java @@ -161,16 +161,16 @@ private static VersionInfo getVersionInfo() { } private synchronized void warmUp() { - long warmUpStartTime = System.currentTimeMillis(); + long warmUpStartTime = System.nanoTime(); + BackOffer backOffer = ConcreteBackOffer.newRawKVBackOff(); try { this.client = getPDClient(); this.regionManager = getRegionManager(); - List stores = this.client.getAllStores(ConcreteBackOffer.newGetBackOff()); + List stores = this.client.getAllStores(backOffer); // warm up store cache for (Metapb.Store store : stores) { this.regionManager.updateStore( - null, - new TiStore(this.client.getStore(ConcreteBackOffer.newGetBackOff(), store.getId()))); + null, new TiStore(this.client.getStore(backOffer, store.getId()))); } // use scan region to load region cache with limit @@ -178,17 +178,14 @@ private synchronized void warmUp() { do { List regions = regionManager.scanRegions( - ConcreteBackOffer.newGetBackOff(), - startKey, - ByteString.EMPTY, - conf.getScanRegionsLimit()); + backOffer, startKey, ByteString.EMPTY, conf.getScanRegionsLimit()); if (regions == null || regions.isEmpty()) { // something went wrong, but the warm-up process could continue break; } for (Pdpb.Region region : regions) { regionManager.insertRegionToCache( - regionManager.createRegion(region.getRegion(), ConcreteBackOffer.newGetBackOff())); + regionManager.createRegion(region.getRegion(), backOffer)); } startKey = regions.get(regions.size() - 1).getRegion().getEndKey(); } while (!startKey.isEmpty()); @@ -211,7 +208,8 @@ private synchronized void warmUp() { logger.info("warm up fails, ignored ", e); } finally { logger.info( - String.format("warm up duration %d ms", System.currentTimeMillis() - warmUpStartTime)); + String.format( + "warm up duration %d ms", (System.nanoTime() - warmUpStartTime) / 1_000_000)); } } diff --git a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java index d18954e6350..11da87cb8bd 100644 --- a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java +++ b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java @@ -76,7 +76,7 @@ public boolean handleRegionError(BackOffer backOffer, Errorpb.Error error) { // onNotLeader is only needed when updateLeader succeeds, thus switch // to a new store address. TiRegion newRegion = this.regionManager.updateLeader(recv.getRegion(), newStoreId); - retry = newRegion != null && recv.onNotLeader(newRegion); + retry = newRegion != null && recv.onNotLeader(newRegion, backOffer); backOffFuncType = BackOffFunction.BackOffFuncType.BoUpdateLeader; } else { diff --git a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java index cfa2eb27e70..ddbfb37b9bc 100644 --- a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java @@ -108,7 +108,7 @@ public void close() throws GrpcException {} * @return false when re-split is needed. */ @Override - public boolean onNotLeader(TiRegion newRegion) { + public boolean onNotLeader(TiRegion newRegion, BackOffer backOffer) { if (logger.isDebugEnabled()) { logger.debug(region + ", new leader = " + newRegion.getLeader().getStoreId()); } @@ -123,7 +123,7 @@ public boolean onNotLeader(TiRegion newRegion) { store = null; } region = newRegion; - store = regionManager.getStoreById(region.getLeader().getStoreId()); + store = regionManager.getStoreById(region.getLeader().getStoreId(), backOffer); updateClientStub(); return true; } @@ -193,10 +193,10 @@ private Boolean seekLeaderStore(BackOffer backOffer) { logger.info(String.format("try switch leader: region[%d]", region.getId())); - Metapb.Peer peer = switchLeaderStore(); + Metapb.Peer peer = switchLeaderStore(backOffer); if (peer != null) { // we found a leader - TiStore currentLeaderStore = regionManager.getStoreById(peer.getStoreId()); + TiStore currentLeaderStore = regionManager.getStoreById(peer.getStoreId(), backOffer); if (currentLeaderStore.isReachable()) { logger.info( String.format( @@ -232,7 +232,7 @@ private boolean seekProxyStore(BackOffer backOffer) { try { logger.info(String.format("try grpc forward: region[%d]", region.getId())); // when current leader cannot be reached - TiStore storeWithProxy = switchProxyStore(); + TiStore storeWithProxy = switchProxyStore(backOffer); if (storeWithProxy == null) { // no store available, retry logger.warn(String.format("No store available, retry: region[%d]", region.getId())); @@ -250,11 +250,11 @@ private boolean seekProxyStore(BackOffer backOffer) { } // first: leader peer, second: true if any responses returned with grpc error - private Metapb.Peer switchLeaderStore() { + private Metapb.Peer switchLeaderStore(BackOffer backOffer) { List responses = new LinkedList<>(); for (Metapb.Peer peer : region.getFollowerList()) { ByteString key = region.getStartKey(); - TiStore peerStore = regionManager.getStoreById(peer.getStoreId()); + TiStore peerStore = regionManager.getStoreById(peer.getStoreId(), backOffer); ManagedChannel channel = channelFactory.getChannel( peerStore.getAddress(), regionManager.getPDClient().getHostMapping()); @@ -300,12 +300,12 @@ private Metapb.Peer switchLeaderStore() { } } - private TiStore switchProxyStore() { + private TiStore switchProxyStore(BackOffer backOffer) { long forwardTimeout = conf.getForwardTimeout(); List responses = new LinkedList<>(); for (Metapb.Peer peer : region.getFollowerList()) { ByteString key = region.getStartKey(); - TiStore peerStore = regionManager.getStoreById(peer.getStoreId()); + TiStore peerStore = regionManager.getStoreById(peer.getStoreId(), backOffer); ManagedChannel channel = channelFactory.getChannel( peerStore.getAddress(), regionManager.getPDClient().getHostMapping()); diff --git a/src/main/java/org/tikv/common/region/RegionErrorReceiver.java b/src/main/java/org/tikv/common/region/RegionErrorReceiver.java index e0a3dce930c..667ed69a092 100644 --- a/src/main/java/org/tikv/common/region/RegionErrorReceiver.java +++ b/src/main/java/org/tikv/common/region/RegionErrorReceiver.java @@ -20,7 +20,7 @@ import org.tikv.common.util.BackOffer; public interface RegionErrorReceiver { - boolean onNotLeader(TiRegion region); + boolean onNotLeader(TiRegion region, BackOffer backOffer); /// return whether we need to retry this request. boolean onStoreUnreachable(BackOffer backOffer); diff --git a/src/main/java/org/tikv/common/region/RegionStoreClient.java b/src/main/java/org/tikv/common/region/RegionStoreClient.java index 4b6452f8ac8..2dcec386598 100644 --- a/src/main/java/org/tikv/common/region/RegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/RegionStoreClient.java @@ -274,7 +274,7 @@ public List scan( boolean forWrite = false; while (true) { // we should refresh region - region = regionManager.getRegionByKey(startKey); + region = regionManager.getRegionByKey(startKey, backOffer); Supplier request = () -> diff --git a/src/main/java/org/tikv/common/util/ConcreteBackOffer.java b/src/main/java/org/tikv/common/util/ConcreteBackOffer.java index 355c9ff6023..50abeae00fc 100644 --- a/src/main/java/org/tikv/common/util/ConcreteBackOffer.java +++ b/src/main/java/org/tikv/common/util/ConcreteBackOffer.java @@ -168,8 +168,6 @@ public boolean canRetryAfterSleep(BackOffFunction.BackOffFuncType funcType) { } public boolean canRetryAfterSleep(BackOffFunction.BackOffFuncType funcType, long maxSleepMs) { - SlowLogSpan slowLogSpan = getSlowLog().start("backoff " + funcType.name()); - Histogram.Timer backOffTimer = BACKOFF_DURATION.labels(funcType.name()).startTimer(); BackOffFunction backOffFunction = backOffFunctionMap.computeIfAbsent(funcType, this::createBackOffFunc); @@ -185,6 +183,8 @@ public boolean canRetryAfterSleep(BackOffFunction.BackOffFuncType funcType, long } } + Histogram.Timer backOffTimer = BACKOFF_DURATION.labels(funcType.name()).startTimer(); + SlowLogSpan slowLogSpan = getSlowLog().start("backoff " + funcType.name()); try { Thread.sleep(sleep); } catch (InterruptedException e) { diff --git a/src/test/java/org/tikv/BaseRawKVTest.java b/src/test/java/org/tikv/BaseRawKVTest.java index 7b6fa1aefa0..b3e87467906 100644 --- a/src/test/java/org/tikv/BaseRawKVTest.java +++ b/src/test/java/org/tikv/BaseRawKVTest.java @@ -14,6 +14,7 @@ protected TiConfiguration createTiConfiguration() { conf.setTest(true); conf.setEnableAtomicForCAS(true); conf.setEnableGrpcForward(false); + conf.setEnableAtomicForCAS(true); return conf; } }