From 371bcdc89e27ca7bce8f5c8f2cb414175e51ab7f Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Tue, 6 Jul 2021 15:45:52 +0800 Subject: [PATCH 1/5] refactor region cache Signed-off-by: Little-Wallace --- src/main/java/org/tikv/common/PDClient.java | 8 ++ .../org/tikv/common/region/RegionManager.java | 132 ++++++++++-------- 2 files changed, 81 insertions(+), 59 deletions(-) diff --git a/src/main/java/org/tikv/common/PDClient.java b/src/main/java/org/tikv/common/PDClient.java index 9de9d857b35..fcd9e5ef574 100644 --- a/src/main/java/org/tikv/common/PDClient.java +++ b/src/main/java/org/tikv/common/PDClient.java @@ -93,6 +93,7 @@ public class PDClient extends AbstractGRPCClient implements ReadOnlyPDClient { private static final String TIFLASH_TABLE_SYNC_PROGRESS_PATH = "/tiflash/table/sync"; + private static final long MIN_TRY_UPDATE_DURATION = 50; private final Logger logger = LoggerFactory.getLogger(PDClient.class); private RequestHeader header; private TsoRequest tsoReq; @@ -103,6 +104,7 @@ public class PDClient extends AbstractGRPCClient private Client etcdClient; private ConcurrentMap tiflashReplicaMap; private HostMapping hostMapping; + private long lastUpdateLeaderTime; public static final Histogram PD_GET_REGION_BY_KEY_REQUEST_LATENCY = Histogram.build() @@ -392,6 +394,9 @@ synchronized boolean createFollowerClientWrapper(String followerUrlStr, String l } public synchronized void updateLeaderOrforwardFollower() { + if (System.currentTimeMillis() - lastUpdateLeaderTime < MIN_TRY_UPDATE_DURATION) { + return; + } for (URI url : this.pdAddrs) { // since resp is null, we need update leader's address by walking through all pd server. GetMembersResponse resp = getMembers(url); @@ -407,6 +412,7 @@ public synchronized void updateLeaderOrforwardFollower() { // if leader is switched, just return. if (checkHealth(leaderUrlStr, hostMapping) && trySwitchLeader(leaderUrlStr)) { + lastUpdateLeaderTime = System.currentTimeMillis(); return; } @@ -441,6 +447,7 @@ public synchronized void updateLeaderOrforwardFollower() { } } } + lastUpdateLeaderTime = System.currentTimeMillis(); if (pdClientWrapper == null) { throw new TiClientInternalException( "already tried all address on file, but not leader found yet."); @@ -470,6 +477,7 @@ public void tryUpdateLeader() { return; } } + lastUpdateLeaderTime = System.currentTimeMillis(); if (pdClientWrapper == null) { throw new TiClientInternalException( "already tried all address on file, but not leader found yet."); diff --git a/src/main/java/org/tikv/common/region/RegionManager.java b/src/main/java/org/tikv/common/region/RegionManager.java index 662b8460e59..974884963a2 100644 --- a/src/main/java/org/tikv/common/region/RegionManager.java +++ b/src/main/java/org/tikv/common/region/RegionManager.java @@ -55,6 +55,8 @@ public class RegionManager { // TODO: the region cache logic need rewrite. // https://github.com/pingcap/tispark/issues/1170 private final RegionCache cache; + private final ReadOnlyPDClient pdClient; + private final TiConfiguration conf; private final ScheduledExecutorService executor; private final UnreachableStoreChecker storeChecker; @@ -72,7 +74,9 @@ public RegionManager( TiConfiguration conf, ReadOnlyPDClient pdClient, Function cacheInvalidateCallback) { - this.cache = new RegionCache(conf, pdClient); + this.cache = new RegionCache(); + this.pdClient = pdClient; + this.conf = conf; this.cacheInvalidateCallback = cacheInvalidateCallback; this.executor = null; this.storeChecker = null; @@ -84,8 +88,11 @@ public RegionManager( Function cacheInvalidateCallback, ChannelFactory channelFactory, boolean enableGrpcForward) { - this.cache = new RegionCache(conf, pdClient); + this.cache = new RegionCache(); this.cacheInvalidateCallback = cacheInvalidateCallback; + this.pdClient = pdClient; + this.conf = conf; + if (enableGrpcForward) { UnreachableStoreChecker storeChecker = new UnreachableStoreChecker(channelFactory, pdClient); this.storeChecker = storeChecker; @@ -98,7 +105,9 @@ public RegionManager( } public RegionManager(TiConfiguration conf, ReadOnlyPDClient pdClient) { - this.cache = new RegionCache(conf, pdClient); + this.cache = new RegionCache(); + this.pdClient = pdClient; + this.conf = conf; this.cacheInvalidateCallback = null; this.storeChecker = null; this.executor = null; @@ -115,7 +124,7 @@ public Function getCacheInvalidateCallback() { } public ReadOnlyPDClient getPDClient() { - return this.cache.pdClient; + return this.pdClient; } public TiRegion getRegionByKey(ByteString key) { @@ -134,7 +143,15 @@ public TiRegion getRegionByKey(ByteString key, BackOffer backOffer) { // Consider region A, B. After merge of (A, B) -> A, region ID B does not exist. // This request is unrecoverable. public TiRegion getRegionById(long regionId) { - return cache.getRegionById(ConcreteBackOffer.newGetBackOff(), regionId); + BackOffer backOffer = ConcreteBackOffer.newGetBackOff(); + TiRegion region = cache.getRegionById(regionId); + if (region == null) { + Pair regionAndLeader = + pdClient.getRegionByID(backOffer, regionId); + region = createRegion(regionAndLeader.first, regionAndLeader.second, backOffer); + return cache.putRegion(region); + } + return region; } public Pair getRegionStorePairByKey(ByteString key, BackOffer backOffer) { @@ -153,7 +170,10 @@ public Pair getRegionStorePairByKey( ByteString key, TiStoreType storeType, BackOffer backOffer) { TiRegion region = cache.getRegionByKey(key, backOffer); if (region == null) { - throw new TiClientInternalException("Region not exist for key:" + formatBytesUTF8(key)); + logger.debug("Key not found in keyToRegionIdCache:" + formatBytesUTF8(key)); + Pair regionAndLeader = pdClient.getRegionByKey(backOffer, key); + region = + cache.putRegion(createRegion(regionAndLeader.first, regionAndLeader.second, backOffer)); } if (!region.isValid()) { throw new TiClientInternalException("Region invalid: " + region.toString()); @@ -162,7 +182,7 @@ public Pair getRegionStorePairByKey( TiStore store = null; if (storeType == TiStoreType.TiKV) { Peer peer = region.getCurrentReplica(); - store = cache.getStoreById(peer.getStoreId(), backOffer); + store = getStoreById(peer.getStoreId(), backOffer); if (store == null) { cache.clearAll(); } @@ -192,12 +212,33 @@ public Pair getRegionStorePairByKey( return Pair.create(region, store); } - public TiStore getStoreById(long id) { - return getStoreById(id, ConcreteBackOffer.newGetBackOff()); + private TiRegion createRegion(Metapb.Region region, Metapb.Peer leader, BackOffer backOffer) { + List peers = region.getPeersList(); + List stores = getRegionStore(peers, backOffer); + return new TiRegion(conf, region, leader, peers, stores); + } + + private List getRegionStore(List peers, BackOffer backOffer) { + return peers.stream().map(p -> getStoreById(p.getStoreId())).collect(Collectors.toList()); } public TiStore getStoreById(long id, BackOffer backOffer) { - return cache.getStoreById(id, backOffer); + try { + TiStore store = cache.getStoreById(id); + if (store == null) { + store = new TiStore(pdClient.getStore(backOffer, id)); + } + if (store.getStore().getState().equals(StoreState.Tombstone)) { + return null; + } + return cache.putStore(id, store); + } catch (Exception e) { + throw new GrpcException(e); + } + } + + public TiStore getStoreById(long id) { + return getStoreById(id, ConcreteBackOffer.newGetBackOff()); } public void onRegionStale(TiRegion region) { @@ -253,16 +294,12 @@ public static class RegionCache { private final Map regionCache; private final Map storeCache; private final RangeMap keyToRegionIdCache; - private final ReadOnlyPDClient pdClient; - private final TiConfiguration conf; - public RegionCache(TiConfiguration conf, ReadOnlyPDClient pdClient) { + public RegionCache() { regionCache = new HashMap<>(); storeCache = new HashMap<>(); keyToRegionIdCache = TreeRangeMap.create(); - this.conf = conf; - this.pdClient = pdClient; } public synchronized TiRegion getRegionByKey(ByteString key, BackOffer backOffer) { @@ -281,14 +318,7 @@ public synchronized TiRegion getRegionByKey(ByteString key, BackOffer backOffer) } if (regionId == null) { - logger.debug("Key not found in keyToRegionIdCache:" + formatBytesUTF8(key)); - Pair regionAndLeader = - pdClient.getRegionByKey(backOffer, key); - TiRegion region = createRegion(regionAndLeader.first, regionAndLeader.second, backOffer); - if (!putRegion(region)) { - throw new TiClientInternalException("Invalid Region: " + region.toString()); - } - return region; + return null; } TiRegion region; region = regionCache.get(regionId); @@ -302,29 +332,29 @@ public synchronized TiRegion getRegionByKey(ByteString key, BackOffer backOffer) } } - private synchronized boolean putRegion(TiRegion region) { + private synchronized TiRegion putRegion(TiRegion region) { if (logger.isDebugEnabled()) { logger.debug("putRegion: " + region); } + TiRegion oldRegion = regionCache.get(region.getId()); + if (oldRegion != null) { + if (oldRegion.getMeta().equals(region.getMeta())) { + return oldRegion; + } else { + invalidateRegion(oldRegion); + } + } regionCache.put(region.getId(), region); keyToRegionIdCache.put(makeRange(region.getStartKey(), region.getEndKey()), region.getId()); - return true; + return region; } @Deprecated - private synchronized TiRegion getRegionById(BackOffer backOffer, long regionId) { + private synchronized TiRegion getRegionById(long regionId) { TiRegion region = regionCache.get(regionId); if (logger.isDebugEnabled()) { logger.debug(String.format("getRegionByKey ID[%s] -> Region[%s]", regionId, region)); } - if (region == null) { - Pair regionAndLeader = - pdClient.getRegionByID(backOffer, regionId); - region = createRegion(regionAndLeader.first, regionAndLeader.second, backOffer); - if (!putRegion(region)) { - throw new TiClientInternalException("Invalid Region: " + region.toString()); - } - } return region; } @@ -412,33 +442,17 @@ public synchronized void invalidateStore(long storeId) { } } - public synchronized TiStore getStoreById(long id, BackOffer backOffer) { - try { - TiStore store = storeCache.get(id); - if (store == null) { - store = new TiStore(pdClient.getStore(backOffer, id)); - } - if (store.getStore().getState().equals(StoreState.Tombstone)) { - return null; - } - storeCache.put(id, store); - return store; - } catch (Exception e) { - throw new GrpcException(e); - } + public synchronized TiStore getStoreById(long id) { + return storeCache.get(id); } - private List getRegionStore(List peers, BackOffer backOffer) { - return peers - .stream() - .map(p -> getStoreById(p.getStoreId(), backOffer)) - .collect(Collectors.toList()); - } - - private TiRegion createRegion(Metapb.Region region, Metapb.Peer leader, BackOffer backOffer) { - List peers = region.getPeersList(); - List stores = getRegionStore(peers, backOffer); - return new TiRegion(conf, region, leader, peers, stores); + public synchronized TiStore putStore(long id, TiStore store) { + TiStore oldStore = storeCache.get(id); + if (oldStore != null && oldStore.getStore().equals(store.getStore())) { + return oldStore; + } + storeCache.put(id, store); + return store; } public synchronized void clearAll() { From 76b4c5a1cd4c2b535ffdb6ced33142ef547c02cc Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Tue, 6 Jul 2021 16:17:55 +0800 Subject: [PATCH 2/5] fix not leader Signed-off-by: Little-Wallace --- src/main/java/org/tikv/common/region/RegionManager.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/tikv/common/region/RegionManager.java b/src/main/java/org/tikv/common/region/RegionManager.java index 974884963a2..50f08a251fe 100644 --- a/src/main/java/org/tikv/common/region/RegionManager.java +++ b/src/main/java/org/tikv/common/region/RegionManager.java @@ -383,13 +383,15 @@ public synchronized boolean updateRegion(TiRegion expected, TiRegion region) { logger.debug(String.format("invalidateRegion ID[%s]", region.getId())); } TiRegion oldRegion = regionCache.get(region.getId()); - if (expected != oldRegion) { + if (!expected.getMeta().equals(oldRegion.getMeta())) { return false; } else { if (oldRegion != null) { keyToRegionIdCache.remove(makeRange(oldRegion.getStartKey(), oldRegion.getEndKey())); } - putRegion(region); + regionCache.put(region.getId(), region); + keyToRegionIdCache.put( + makeRange(region.getStartKey(), region.getEndKey()), region.getId()); return true; } } catch (Exception ignore) { From fdbbaa566e6b2db7fab6388ded6cbc0833219dbc Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Tue, 6 Jul 2021 18:16:02 +0800 Subject: [PATCH 3/5] fix not leader and log Signed-off-by: Little-Wallace --- .../org/tikv/common/operation/RegionErrorHandler.java | 2 +- src/main/java/org/tikv/common/policy/RetryPolicy.java | 2 ++ .../org/tikv/common/region/AbstractRegionStoreClient.java | 8 +++++++- src/main/java/org/tikv/common/util/ConcreteBackOffer.java | 2 +- 4 files changed, 11 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java index 32b0a3cf2e1..608f631d911 100644 --- a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java +++ b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java @@ -36,7 +36,6 @@ public RegionErrorHandler( public boolean handleResponseError(BackOffer backOffer, RespT resp) { if (resp == null) { String msg = String.format("Request Failed with unknown reason for [%s]", recv.getRegion()); - logger.warn(msg); return handleRequestError(backOffer, new GrpcException(msg)); } // Region error handling logic @@ -171,6 +170,7 @@ public boolean handleRequestError(BackOffer backOffer, Exception e) { return true; } + logger.warn("request failed because of: " + e.getMessage()); backOffer.doBackOff( BackOffFunction.BackOffFuncType.BoTiKVRPC, new GrpcException( diff --git a/src/main/java/org/tikv/common/policy/RetryPolicy.java b/src/main/java/org/tikv/common/policy/RetryPolicy.java index 4c78f66a302..27d8558c7b9 100644 --- a/src/main/java/org/tikv/common/policy/RetryPolicy.java +++ b/src/main/java/org/tikv/common/policy/RetryPolicy.java @@ -79,6 +79,8 @@ public RespT callWithRetry(Callable proc, String methodName) { if (retry) { GRPC_REQUEST_RETRY_NUM.labels(methodName).inc(); continue; + } else { + return result; } } diff --git a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java index 4748a51070e..49740c56fe7 100644 --- a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java @@ -105,6 +105,7 @@ public boolean onNotLeader(TiRegion newRegion) { } region = newRegion; targetStore = regionManager.getStoreById(region.getLeader().getStoreId()); + originStore = null; String addressStr = targetStore.getStore().getAddress(); ManagedChannel channel = channelFactory.getChannel(addressStr, regionManager.getPDClient().getHostMapping()); @@ -128,7 +129,7 @@ public boolean onStoreUnreachable() { } else if (retryTimes > region.getFollowerList().size()) { logger.warn( String.format( - "retry time exceed for region[%d], invalid this region and store[%d]", + "retry time exceed for region[%d], invalid this region[%d]", region.getId(), targetStore.getId())); regionManager.onRequestFail(region); return false; @@ -139,6 +140,7 @@ public boolean onStoreUnreachable() { String.format( "no forward store can be selected for store [%s] and region[%d]", targetStore.getStore().getAddress(), region.getId())); + regionManager.onRequestFail(region); return false; } if (originStore == null) { @@ -168,6 +170,10 @@ public boolean onStoreUnreachable() { @Override protected void tryUpdateProxy() { if (originStore != null) { + logger.warn( + String.format( + "update store [%s] by proxy-store [%s]", + targetStore.getStore().getAddress(), targetStore.getProxyStore().getAddress())); regionManager.updateStore(originStore, targetStore); } } diff --git a/src/main/java/org/tikv/common/util/ConcreteBackOffer.java b/src/main/java/org/tikv/common/util/ConcreteBackOffer.java index eff368368df..b248d233d8a 100644 --- a/src/main/java/org/tikv/common/util/ConcreteBackOffer.java +++ b/src/main/java/org/tikv/common/util/ConcreteBackOffer.java @@ -105,7 +105,7 @@ private BackOffFunction createBackOffFunc(BackOffFunction.BackOffFuncType funcTy backOffFunction = BackOffFunction.create(500, 3000, BackOffStrategy.EqualJitter); break; case BoTiKVRPC: - backOffFunction = BackOffFunction.create(100, 2000, BackOffStrategy.EqualJitter); + backOffFunction = BackOffFunction.create(100, 400, BackOffStrategy.EqualJitter); break; case BoTxnNotFound: backOffFunction = BackOffFunction.create(2, 500, BackOffStrategy.NoJitter); From 45c717b88010e4bd3ddbaa33e645a4e0d1c7bcb5 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Tue, 6 Jul 2021 19:40:24 +0800 Subject: [PATCH 4/5] fix some parameter Signed-off-by: Little-Wallace --- src/main/java/org/tikv/common/ConfigUtils.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/tikv/common/ConfigUtils.java b/src/main/java/org/tikv/common/ConfigUtils.java index 1ffe00c5d4d..5938d56a396 100644 --- a/src/main/java/org/tikv/common/ConfigUtils.java +++ b/src/main/java/org/tikv/common/ConfigUtils.java @@ -55,10 +55,10 @@ public class ConfigUtils { public static final String TIKV_ENABLE_ATOMIC_FOR_CAS = "tikv.enable_atomic_for_cas"; public static final String DEF_PD_ADDRESSES = "127.0.0.1:2379"; - public static final String DEF_TIMEOUT = "150ms"; + public static final String DEF_TIMEOUT = "300ms"; public static final String DEF_FORWARD_TIMEOUT = "600ms"; public static final String DEF_SCAN_TIMEOUT = "20s"; - public static final int DEF_CHECK_HEALTH_TIMEOUT = 40; + public static final int DEF_CHECK_HEALTH_TIMEOUT = 100; public static final int DEF_SCAN_BATCH_SIZE = 10240; public static final int DEF_MAX_FRAME_SIZE = 268435456 * 2; // 256 * 2 MB public static final int DEF_INDEX_SCAN_BATCH_SIZE = 20000; From 35bfc5b0e7a2cd0ff95de64d8a735ccc7c68e0d0 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Wed, 7 Jul 2021 11:19:31 +0800 Subject: [PATCH 5/5] fix test Signed-off-by: Little-Wallace --- .../org/tikv/common/region/RegionManager.java | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/src/main/java/org/tikv/common/region/RegionManager.java b/src/main/java/org/tikv/common/region/RegionManager.java index 50f08a251fe..a565b5cce48 100644 --- a/src/main/java/org/tikv/common/region/RegionManager.java +++ b/src/main/java/org/tikv/common/region/RegionManager.java @@ -132,7 +132,14 @@ public TiRegion getRegionByKey(ByteString key) { } public TiRegion getRegionByKey(ByteString key, BackOffer backOffer) { - return cache.getRegionByKey(key, backOffer); + TiRegion region = cache.getRegionByKey(key, backOffer); + if (region == null) { + logger.debug("Key not found in keyToRegionIdCache:" + formatBytesUTF8(key)); + Pair regionAndLeader = pdClient.getRegionByKey(backOffer, key); + region = + cache.putRegion(createRegion(regionAndLeader.first, regionAndLeader.second, backOffer)); + } + return region; } @Deprecated @@ -168,13 +175,7 @@ public Pair getRegionStorePairByKey(ByteString key, TiStoreTy public Pair getRegionStorePairByKey( ByteString key, TiStoreType storeType, BackOffer backOffer) { - TiRegion region = cache.getRegionByKey(key, backOffer); - if (region == null) { - logger.debug("Key not found in keyToRegionIdCache:" + formatBytesUTF8(key)); - Pair regionAndLeader = pdClient.getRegionByKey(backOffer, key); - region = - cache.putRegion(createRegion(regionAndLeader.first, regionAndLeader.second, backOffer)); - } + TiRegion region = getRegionByKey(key, backOffer); if (!region.isValid()) { throw new TiClientInternalException("Region invalid: " + region.toString()); }