From 79d13242e18f934792129fb77de08161e7e3d018 Mon Sep 17 00:00:00 2001 From: marsishandsome Date: Thu, 23 Dec 2021 16:03:26 +0800 Subject: [PATCH 1/3] fix seekLeaderStore backoffer Signed-off-by: marsishandsome --- .../common/operation/RegionErrorHandler.java | 2 +- .../region/AbstractRegionStoreClient.java | 18 +++++++++--------- .../common/region/RegionErrorReceiver.java | 2 +- .../org/tikv/common/region/RegionManager.java | 17 +++++++++++------ 4 files changed, 22 insertions(+), 17 deletions(-) diff --git a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java index d18954e6350..11da87cb8bd 100644 --- a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java +++ b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java @@ -76,7 +76,7 @@ public boolean handleRegionError(BackOffer backOffer, Errorpb.Error error) { // onNotLeader is only needed when updateLeader succeeds, thus switch // to a new store address. TiRegion newRegion = this.regionManager.updateLeader(recv.getRegion(), newStoreId); - retry = newRegion != null && recv.onNotLeader(newRegion); + retry = newRegion != null && recv.onNotLeader(newRegion, backOffer); backOffFuncType = BackOffFunction.BackOffFuncType.BoUpdateLeader; } else { diff --git a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java index cfa2eb27e70..ddbfb37b9bc 100644 --- a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java @@ -108,7 +108,7 @@ public void close() throws GrpcException {} * @return false when re-split is needed. */ @Override - public boolean onNotLeader(TiRegion newRegion) { + public boolean onNotLeader(TiRegion newRegion, BackOffer backOffer) { if (logger.isDebugEnabled()) { logger.debug(region + ", new leader = " + newRegion.getLeader().getStoreId()); } @@ -123,7 +123,7 @@ public boolean onNotLeader(TiRegion newRegion) { store = null; } region = newRegion; - store = regionManager.getStoreById(region.getLeader().getStoreId()); + store = regionManager.getStoreById(region.getLeader().getStoreId(), backOffer); updateClientStub(); return true; } @@ -193,10 +193,10 @@ private Boolean seekLeaderStore(BackOffer backOffer) { logger.info(String.format("try switch leader: region[%d]", region.getId())); - Metapb.Peer peer = switchLeaderStore(); + Metapb.Peer peer = switchLeaderStore(backOffer); if (peer != null) { // we found a leader - TiStore currentLeaderStore = regionManager.getStoreById(peer.getStoreId()); + TiStore currentLeaderStore = regionManager.getStoreById(peer.getStoreId(), backOffer); if (currentLeaderStore.isReachable()) { logger.info( String.format( @@ -232,7 +232,7 @@ private boolean seekProxyStore(BackOffer backOffer) { try { logger.info(String.format("try grpc forward: region[%d]", region.getId())); // when current leader cannot be reached - TiStore storeWithProxy = switchProxyStore(); + TiStore storeWithProxy = switchProxyStore(backOffer); if (storeWithProxy == null) { // no store available, retry logger.warn(String.format("No store available, retry: region[%d]", region.getId())); @@ -250,11 +250,11 @@ private boolean seekProxyStore(BackOffer backOffer) { } // first: leader peer, second: true if any responses returned with grpc error - private Metapb.Peer switchLeaderStore() { + private Metapb.Peer switchLeaderStore(BackOffer backOffer) { List responses = new LinkedList<>(); for (Metapb.Peer peer : region.getFollowerList()) { ByteString key = region.getStartKey(); - TiStore peerStore = regionManager.getStoreById(peer.getStoreId()); + TiStore peerStore = regionManager.getStoreById(peer.getStoreId(), backOffer); ManagedChannel channel = channelFactory.getChannel( peerStore.getAddress(), regionManager.getPDClient().getHostMapping()); @@ -300,12 +300,12 @@ private Metapb.Peer switchLeaderStore() { } } - private TiStore switchProxyStore() { + private TiStore switchProxyStore(BackOffer backOffer) { long forwardTimeout = conf.getForwardTimeout(); List responses = new LinkedList<>(); for (Metapb.Peer peer : region.getFollowerList()) { ByteString key = region.getStartKey(); - TiStore peerStore = regionManager.getStoreById(peer.getStoreId()); + TiStore peerStore = regionManager.getStoreById(peer.getStoreId(), backOffer); ManagedChannel channel = channelFactory.getChannel( peerStore.getAddress(), regionManager.getPDClient().getHostMapping()); diff --git a/src/main/java/org/tikv/common/region/RegionErrorReceiver.java b/src/main/java/org/tikv/common/region/RegionErrorReceiver.java index e0a3dce930c..667ed69a092 100644 --- a/src/main/java/org/tikv/common/region/RegionErrorReceiver.java +++ b/src/main/java/org/tikv/common/region/RegionErrorReceiver.java @@ -20,7 +20,7 @@ import org.tikv.common.util.BackOffer; public interface RegionErrorReceiver { - boolean onNotLeader(TiRegion region); + boolean onNotLeader(TiRegion region, BackOffer backOffer); /// return whether we need to retry this request. boolean onStoreUnreachable(BackOffer backOffer); diff --git a/src/main/java/org/tikv/common/region/RegionManager.java b/src/main/java/org/tikv/common/region/RegionManager.java index 0d22f430550..d0c0f50ddc0 100644 --- a/src/main/java/org/tikv/common/region/RegionManager.java +++ b/src/main/java/org/tikv/common/region/RegionManager.java @@ -254,13 +254,18 @@ public TiStore getStoreById(long id) { } public TiStore getStoreById(long id, BackOffer backOffer) { - TiStore store = getStoreByIdWithBackOff(id, backOffer); - if (store == null) { - logger.warn(String.format("failed to fetch store %d, the store may be missing", id)); - cache.clearAll(); - throw new InvalidStoreException(id); + SlowLogSpan span = backOffer.getSlowLog().start("getStoreById"); + try { + TiStore store = getStoreByIdWithBackOff(id, backOffer); + if (store == null) { + logger.warn(String.format("failed to fetch store %d, the store may be missing", id)); + cache.clearAll(); + throw new InvalidStoreException(id); + } + return store; + } finally { + span.end(); } - return store; } public void onRegionStale(TiRegion region) { From 1933ca1c9755040b5c4022c93a9ffc2b5f017c20 Mon Sep 17 00:00:00 2001 From: marsishandsome Date: Fri, 24 Dec 2021 13:36:29 +0800 Subject: [PATCH 2/3] address code review Signed-off-by: marsishandsome --- src/main/java/org/tikv/common/TiSession.java | 18 ++++++++---------- .../tikv/common/region/RegionStoreClient.java | 2 +- .../tikv/common/util/ConcreteBackOffer.java | 4 ++-- src/test/java/org/tikv/BaseRawKVTest.java | 1 + 4 files changed, 12 insertions(+), 13 deletions(-) diff --git a/src/main/java/org/tikv/common/TiSession.java b/src/main/java/org/tikv/common/TiSession.java index ac84daec017..aec955e414a 100644 --- a/src/main/java/org/tikv/common/TiSession.java +++ b/src/main/java/org/tikv/common/TiSession.java @@ -161,16 +161,16 @@ private static VersionInfo getVersionInfo() { } private synchronized void warmUp() { - long warmUpStartTime = System.currentTimeMillis(); + long warmUpStartTime = System.nanoTime(); + BackOffer backOffer = ConcreteBackOffer.newRawKVBackOff(); try { this.client = getPDClient(); this.regionManager = getRegionManager(); - List stores = this.client.getAllStores(ConcreteBackOffer.newGetBackOff()); + List stores = this.client.getAllStores(backOffer); // warm up store cache for (Metapb.Store store : stores) { this.regionManager.updateStore( - null, - new TiStore(this.client.getStore(ConcreteBackOffer.newGetBackOff(), store.getId()))); + null, new TiStore(this.client.getStore(backOffer, store.getId()))); } // use scan region to load region cache with limit @@ -178,17 +178,14 @@ private synchronized void warmUp() { do { List regions = regionManager.scanRegions( - ConcreteBackOffer.newGetBackOff(), - startKey, - ByteString.EMPTY, - conf.getScanRegionsLimit()); + backOffer, startKey, ByteString.EMPTY, conf.getScanRegionsLimit()); if (regions == null || regions.isEmpty()) { // something went wrong, but the warm-up process could continue break; } for (Pdpb.Region region : regions) { regionManager.insertRegionToCache( - regionManager.createRegion(region.getRegion(), ConcreteBackOffer.newGetBackOff())); + regionManager.createRegion(region.getRegion(), backOffer)); } startKey = regions.get(regions.size() - 1).getRegion().getEndKey(); } while (!startKey.isEmpty()); @@ -211,7 +208,8 @@ private synchronized void warmUp() { logger.info("warm up fails, ignored ", e); } finally { logger.info( - String.format("warm up duration %d ms", System.currentTimeMillis() - warmUpStartTime)); + String.format( + "warm up duration %d ms", (System.nanoTime() - warmUpStartTime) / 1_000_000)); } } diff --git a/src/main/java/org/tikv/common/region/RegionStoreClient.java b/src/main/java/org/tikv/common/region/RegionStoreClient.java index 4b6452f8ac8..2dcec386598 100644 --- a/src/main/java/org/tikv/common/region/RegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/RegionStoreClient.java @@ -274,7 +274,7 @@ public List scan( boolean forWrite = false; while (true) { // we should refresh region - region = regionManager.getRegionByKey(startKey); + region = regionManager.getRegionByKey(startKey, backOffer); Supplier request = () -> diff --git a/src/main/java/org/tikv/common/util/ConcreteBackOffer.java b/src/main/java/org/tikv/common/util/ConcreteBackOffer.java index 355c9ff6023..50abeae00fc 100644 --- a/src/main/java/org/tikv/common/util/ConcreteBackOffer.java +++ b/src/main/java/org/tikv/common/util/ConcreteBackOffer.java @@ -168,8 +168,6 @@ public boolean canRetryAfterSleep(BackOffFunction.BackOffFuncType funcType) { } public boolean canRetryAfterSleep(BackOffFunction.BackOffFuncType funcType, long maxSleepMs) { - SlowLogSpan slowLogSpan = getSlowLog().start("backoff " + funcType.name()); - Histogram.Timer backOffTimer = BACKOFF_DURATION.labels(funcType.name()).startTimer(); BackOffFunction backOffFunction = backOffFunctionMap.computeIfAbsent(funcType, this::createBackOffFunc); @@ -185,6 +183,8 @@ public boolean canRetryAfterSleep(BackOffFunction.BackOffFuncType funcType, long } } + Histogram.Timer backOffTimer = BACKOFF_DURATION.labels(funcType.name()).startTimer(); + SlowLogSpan slowLogSpan = getSlowLog().start("backoff " + funcType.name()); try { Thread.sleep(sleep); } catch (InterruptedException e) { diff --git a/src/test/java/org/tikv/BaseRawKVTest.java b/src/test/java/org/tikv/BaseRawKVTest.java index 7b6fa1aefa0..b3e87467906 100644 --- a/src/test/java/org/tikv/BaseRawKVTest.java +++ b/src/test/java/org/tikv/BaseRawKVTest.java @@ -14,6 +14,7 @@ protected TiConfiguration createTiConfiguration() { conf.setTest(true); conf.setEnableAtomicForCAS(true); conf.setEnableGrpcForward(false); + conf.setEnableAtomicForCAS(true); return conf; } } From d29bde68905ddebe272601f00e47cb52ab9cb3fd Mon Sep 17 00:00:00 2001 From: marsishandsome Date: Fri, 24 Dec 2021 13:55:42 +0800 Subject: [PATCH 3/3] remove getStoreById slowlog Signed-off-by: marsishandsome --- .../org/tikv/common/region/RegionManager.java | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/src/main/java/org/tikv/common/region/RegionManager.java b/src/main/java/org/tikv/common/region/RegionManager.java index d0c0f50ddc0..0d22f430550 100644 --- a/src/main/java/org/tikv/common/region/RegionManager.java +++ b/src/main/java/org/tikv/common/region/RegionManager.java @@ -254,18 +254,13 @@ public TiStore getStoreById(long id) { } public TiStore getStoreById(long id, BackOffer backOffer) { - SlowLogSpan span = backOffer.getSlowLog().start("getStoreById"); - try { - TiStore store = getStoreByIdWithBackOff(id, backOffer); - if (store == null) { - logger.warn(String.format("failed to fetch store %d, the store may be missing", id)); - cache.clearAll(); - throw new InvalidStoreException(id); - } - return store; - } finally { - span.end(); + TiStore store = getStoreByIdWithBackOff(id, backOffer); + if (store == null) { + logger.warn(String.format("failed to fetch store %d, the store may be missing", id)); + cache.clearAll(); + throw new InvalidStoreException(id); } + return store; } public void onRegionStale(TiRegion region) {