From 56d1518800b53eee9d5cdbade352e48eeb7adf36 Mon Sep 17 00:00:00 2001 From: qidi1 <1083369179@qq.com> Date: Thu, 22 Sep 2022 10:58:34 +0800 Subject: [PATCH 01/13] Close #654 To let the upper layers customize their own behavior when the region cache fails, Add RegionCacheInvalidCallBack. Signed-off-by: qidi1 <1083369179@qq.com> --- .../common/operation/RegionErrorHandler.java | 36 ++++-- .../org/tikv/common/region/RegionManager.java | 44 +++++++ .../tikv/common/CacheInvalidCallBackTest.java | 122 ++++++++++++++++++ 3 files changed, 194 insertions(+), 8 deletions(-) create mode 100644 src/test/java/org/tikv/common/CacheInvalidCallBackTest.java diff --git a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java index 7e7eff2b9dc..66040414407 100644 --- a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java +++ b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java @@ -25,6 +25,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.tikv.common.codec.KeyUtils; +import org.tikv.common.event.CacheInvalidateEvent; +import org.tikv.common.event.CacheInvalidateEvent.CacheType; import org.tikv.common.exception.GrpcException; import org.tikv.common.exception.TiKVException; import org.tikv.common.region.RegionErrorReceiver; @@ -42,6 +44,7 @@ public class RegionErrorHandler implements ErrorHandler { private final Function getRegionError; private final RegionManager regionManager; private final RegionErrorReceiver recv; + private final Function cacheInvalidateCallBack; public RegionErrorHandler( RegionManager regionManager, @@ -50,6 +53,7 @@ public RegionErrorHandler( this.recv = recv; this.regionManager = regionManager; this.getRegionError = getRegionError; + cacheInvalidateCallBack = regionManager.getCacheInvalidateCallback(); } @Override @@ -106,6 +110,7 @@ public boolean handleRegionError(BackOffer backOffer, Errorpb.Error error) { if (!retry) { this.regionManager.invalidateRegion(recv.getRegion()); + notifyRegionCacheInvalidate(recv.getRegion().getId(), CacheType.LEADER); } backOffer.doBackOff(backOffFuncType, new GrpcException(error.toString())); @@ -121,9 +126,8 @@ public boolean handleRegionError(BackOffer backOffer, Errorpb.Error error) { String.format( "Store Not Match happened with region id %d, store id %d, actual store id %d", recv.getRegion().getId(), storeId, actualStoreId)); - - this.regionManager.invalidateRegion(recv.getRegion()); - this.regionManager.invalidateStore(storeId); + // may request store which is not leader. + invalidateRegionStoreCache(recv.getRegion(), storeId); // assume this is a low probability error, do not retry, just re-split the request by // throwing it out. return false; @@ -142,8 +146,6 @@ public boolean handleRegionError(BackOffer backOffer, Errorpb.Error error) { BackOffFunction.BackOffFuncType.BoServerBusy, new StatusRuntimeException( Status.fromCode(Status.Code.UNAVAILABLE).withDescription(error.toString()))); - backOffer.doBackOff( - BackOffFunction.BackOffFuncType.BoRegionMiss, new GrpcException(error.getMessage())); return true; } else if (error.hasStaleCommand()) { // this error is reported from raftstore: @@ -161,6 +163,7 @@ public boolean handleRegionError(BackOffer backOffer, Errorpb.Error error) { // key requested is not in current region // should not happen here. ByteString invalidKey = error.getKeyNotInRegion().getKey(); + notifyRegionCacheInvalidate(recv.getRegion().getId(), CacheType.REGION_STORE); logger.error( String.format( "Key not in region [%s] for key [%s], this error should not happen here.", @@ -172,7 +175,7 @@ public boolean handleRegionError(BackOffer backOffer, Errorpb.Error error) { logger.warn(String.format("Unknown error %s for region [%s]", error, recv.getRegion())); // For other errors, we only drop cache here. // Upper level may split this task. - invalidateRegionStoreCache(recv.getRegion()); + invalidateRegionStoreCache(recv.getRegion(), recv.getRegion().getLeader().getStoreId()); // retry if raft proposal is dropped, it indicates the store is in the middle of transition if (error.getMessage().contains("Raft ProposalDropped")) { backOffer.doBackOff( @@ -189,6 +192,7 @@ public boolean handleRegionError(BackOffer backOffer, Errorpb.Error error) { private boolean onRegionEpochNotMatch(BackOffer backOffer, List currentRegions) { if (currentRegions.size() == 0) { this.regionManager.onRegionStale(recv.getRegion()); + notifyRegionCacheInvalidate(recv.getRegion().getId(), CacheType.REGION_STORE); return false; } @@ -222,6 +226,7 @@ private boolean onRegionEpochNotMatch(BackOffer backOffer, List c } if (needInvalidateOld) { + notifyRegionCacheInvalidate(recv.getRegion().getId(), CacheType.REGION_STORE); this.regionManager.onRegionStale(recv.getRegion()); } @@ -264,8 +269,23 @@ public TiRegion getRegion() { return recv.getRegion(); } - private void invalidateRegionStoreCache(TiRegion ctxRegion) { + private void invalidateRegionStoreCache(TiRegion ctxRegion, long storeId) { regionManager.invalidateRegion(ctxRegion); - regionManager.invalidateStore(ctxRegion.getLeader().getStoreId()); + regionManager.invalidateStore(storeId); + notifyRegionStoreCacheInvalidate( + ctxRegion.getId(), storeId, CacheInvalidateEvent.CacheType.REGION_STORE); + } + + private void notifyRegionStoreCacheInvalidate( + long regionId, long storeId, CacheInvalidateEvent.CacheType type) { + if (cacheInvalidateCallBack != null) { + cacheInvalidateCallBack.apply(new CacheInvalidateEvent(regionId, storeId, true, true, type)); + } + } + + private void notifyRegionCacheInvalidate(long regionId, CacheInvalidateEvent.CacheType type) { + if (cacheInvalidateCallBack != null) { + cacheInvalidateCallBack.apply(new CacheInvalidateEvent(regionId, 0, true, false, type)); + } } } diff --git a/src/main/java/org/tikv/common/region/RegionManager.java b/src/main/java/org/tikv/common/region/RegionManager.java index 44c81375107..2648ff26e2f 100644 --- a/src/main/java/org/tikv/common/region/RegionManager.java +++ b/src/main/java/org/tikv/common/region/RegionManager.java @@ -26,10 +26,12 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.tikv.common.ReadOnlyPDClient; import org.tikv.common.TiConfiguration; +import org.tikv.common.event.CacheInvalidateEvent; import org.tikv.common.exception.GrpcException; import org.tikv.common.exception.InvalidStoreException; import org.tikv.common.exception.TiClientInternalException; @@ -68,6 +70,7 @@ public class RegionManager { private final TiConfiguration conf; private final ScheduledExecutorService executor; private final StoreHealthyChecker storeChecker; + private Function cacheInvalidateCallback; public RegionManager( TiConfiguration conf, ReadOnlyPDClient pdClient, ChannelFactory channelFactory) { @@ -81,6 +84,25 @@ public RegionManager( this.storeChecker = storeChecker; this.executor = Executors.newScheduledThreadPool(1); this.executor.scheduleAtFixedRate(storeChecker, period, period, TimeUnit.MILLISECONDS); + this.cacheInvalidateCallback = null; + } + + public RegionManager( + TiConfiguration conf, + ReadOnlyPDClient pdClient, + ChannelFactory channelFactory, + Function cacheInvalidateCallBack) { + this.cache = new RegionCache(); + this.pdClient = pdClient; + this.conf = conf; + long period = conf.getHealthCheckPeriodDuration(); + StoreHealthyChecker storeChecker = + new StoreHealthyChecker( + channelFactory, pdClient, this.cache, conf.getGrpcHealthCheckTimeout()); + this.storeChecker = storeChecker; + this.executor = Executors.newScheduledThreadPool(1); + this.executor.scheduleAtFixedRate(storeChecker, period, period, TimeUnit.MILLISECONDS); + this.cacheInvalidateCallback = cacheInvalidateCallBack; } public RegionManager(TiConfiguration conf, ReadOnlyPDClient pdClient) { @@ -89,6 +111,19 @@ public RegionManager(TiConfiguration conf, ReadOnlyPDClient pdClient) { this.conf = conf; this.storeChecker = null; this.executor = null; + this.cacheInvalidateCallback = null; + } + + public RegionManager( + TiConfiguration conf, + ReadOnlyPDClient pdClient, + Function cacheInvalidateCallback) { + this.cache = new RegionCache(); + this.pdClient = pdClient; + this.conf = conf; + this.storeChecker = null; + this.executor = null; + this.cacheInvalidateCallback = cacheInvalidateCallback; } public synchronized void close() { @@ -101,6 +136,15 @@ public ReadOnlyPDClient getPDClient() { return this.pdClient; } + public synchronized Function getCacheInvalidateCallback() { + return cacheInvalidateCallback; + } + + public synchronized void setCacheInvalidateCallback( + Function cacheInvalidateCallback) { + this.cacheInvalidateCallback = cacheInvalidateCallback; + } + public void invalidateAll() { cache.invalidateAll(); } diff --git a/src/test/java/org/tikv/common/CacheInvalidCallBackTest.java b/src/test/java/org/tikv/common/CacheInvalidCallBackTest.java new file mode 100644 index 00000000000..a3a2a683012 --- /dev/null +++ b/src/test/java/org/tikv/common/CacheInvalidCallBackTest.java @@ -0,0 +1,122 @@ +/* + * Copyright 2022 TiKV Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.tikv.common; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import com.google.protobuf.ByteString; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.function.Function; +import org.junit.Test; +import org.tikv.common.event.CacheInvalidateEvent; +import org.tikv.common.region.RegionManager; +import org.tikv.common.region.RegionStoreClient; +import org.tikv.common.region.RegionStoreClient.RegionStoreClientBuilder; +import org.tikv.common.region.TiStore; +import org.tikv.common.util.BackOffer; +import org.tikv.common.util.ConcreteBackOffer; +import org.tikv.kvproto.Errorpb; +import org.tikv.kvproto.Errorpb.EpochNotMatch; +import org.tikv.kvproto.Errorpb.NotLeader; +import org.tikv.kvproto.Errorpb.StoreNotMatch; +import org.tikv.kvproto.Metapb; + +public class CacheInvalidCallBackTest extends MockServerTest { + + private RegionStoreClient createClient( + String version, Function cacheInvalidateCallBack) { + Metapb.Store meta = + Metapb.Store.newBuilder() + .setAddress(LOCAL_ADDR + ":" + port) + .setId(1) + .setState(Metapb.StoreState.Up) + .setVersion(version) + .build(); + TiStore store = new TiStore(meta); + + RegionStoreClientBuilder builder = + new RegionStoreClientBuilder( + session.getConf(), + session.getChannelFactory(), + new RegionManager(session.getConf(), session.getPDClient(), cacheInvalidateCallBack), + session.getPDClient()); + + return builder.build(region, store); + } + + @Test + public void testcacheInvalidCallBack() { + String version = "3.0.12"; + CacheInvalidateCallBack cacheInvalidateCallBack = new CacheInvalidateCallBack(); + doRawGetTest(createClient(version, cacheInvalidateCallBack), cacheInvalidateCallBack); + } + + public void doRawGetTest( + RegionStoreClient client, CacheInvalidateCallBack cacheInvalidateCallBack) { + server.put("key1", "value1"); + Optional value = client.rawGet(defaultBackOff(), ByteString.copyFromUtf8("key1")); + assertEquals(ByteString.copyFromUtf8("value1"), value.get()); + try { + server.putError( + "error1", () -> Errorpb.Error.newBuilder().setNotLeader(NotLeader.getDefaultInstance())); + client.rawGet(defaultBackOff(), ByteString.copyFromUtf8("error1")); + fail(); + } catch (Exception e) { + assertEquals(1, cacheInvalidateCallBack.cacheInvalidateEvents.size()); + } + + server.putError( + "failure", + () -> Errorpb.Error.newBuilder().setEpochNotMatch(EpochNotMatch.getDefaultInstance())); + try { + client.rawGet(defaultBackOff(), ByteString.copyFromUtf8("failure")); + fail(); + } catch (Exception e) { + assertEquals(2, cacheInvalidateCallBack.cacheInvalidateEvents.size()); + } + server.putError( + "store_not_match", + () -> Errorpb.Error.newBuilder().setStoreNotMatch(StoreNotMatch.getDefaultInstance())); + try { + client.rawGet(defaultBackOff(), ByteString.copyFromUtf8("failure")); + fail(); + } catch (Exception e) { + assertEquals(3, cacheInvalidateCallBack.cacheInvalidateEvents.size()); + } + server.clearAllMap(); + client.close(); + } + + private BackOffer defaultBackOff() { + return ConcreteBackOffer.newCustomBackOff(1000); + } + + static class CacheInvalidateCallBack implements Function { + + public List cacheInvalidateEvents = new ArrayList<>(); + + @Override + public Void apply(CacheInvalidateEvent cacheInvalidateEvent) { + cacheInvalidateEvents.add(cacheInvalidateEvent); + return null; + } + } +} From e94cb00709e8cac43cce20f528ce77f267621f24 Mon Sep 17 00:00:00 2001 From: qidi1 <1083369179@qq.com> Date: Mon, 26 Sep 2022 21:59:47 +0800 Subject: [PATCH 02/13] change callback to list Signed-off-by: qidi1 <1083369179@qq.com> --- .../common/operation/RegionErrorHandler.java | 22 +++++++--- .../org/tikv/common/region/RegionManager.java | 44 +++---------------- .../tikv/common/CacheInvalidCallBackTest.java | 4 +- 3 files changed, 26 insertions(+), 44 deletions(-) diff --git a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java index 66040414407..8b506cd7ef2 100644 --- a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java +++ b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java @@ -44,7 +44,7 @@ public class RegionErrorHandler implements ErrorHandler { private final Function getRegionError; private final RegionManager regionManager; private final RegionErrorReceiver recv; - private final Function cacheInvalidateCallBack; + private final List> cacheInvalidateCallBackList; public RegionErrorHandler( RegionManager regionManager, @@ -53,7 +53,7 @@ public RegionErrorHandler( this.recv = recv; this.regionManager = regionManager; this.getRegionError = getRegionError; - cacheInvalidateCallBack = regionManager.getCacheInvalidateCallback(); + this.cacheInvalidateCallBackList = regionManager.getCacheInvalidateCallbackList(); } @Override @@ -278,14 +278,24 @@ private void invalidateRegionStoreCache(TiRegion ctxRegion, long storeId) { private void notifyRegionStoreCacheInvalidate( long regionId, long storeId, CacheInvalidateEvent.CacheType type) { - if (cacheInvalidateCallBack != null) { - cacheInvalidateCallBack.apply(new CacheInvalidateEvent(regionId, storeId, true, true, type)); + if (cacheInvalidateCallBackList != null) { + for (Function cacheInvalidateCallBack : cacheInvalidateCallBackList) { + cacheInvalidateCallBack.apply( + new CacheInvalidateEvent(regionId, storeId, true, true, type)); + } } } private void notifyRegionCacheInvalidate(long regionId, CacheInvalidateEvent.CacheType type) { - if (cacheInvalidateCallBack != null) { - cacheInvalidateCallBack.apply(new CacheInvalidateEvent(regionId, 0, true, false, type)); + if (cacheInvalidateCallBackList != null) { + for (Function cacheInvalidateCallBack : cacheInvalidateCallBackList) { + try { + cacheInvalidateCallBack.apply( + new CacheInvalidateEvent(regionId, 0, true, true, type)); + } catch (Exception e) { + logger.warn(String.format("CacheInvalidCallBack failed %s", e)); + } + } } } } diff --git a/src/main/java/org/tikv/common/region/RegionManager.java b/src/main/java/org/tikv/common/region/RegionManager.java index 2648ff26e2f..8958ae42146 100644 --- a/src/main/java/org/tikv/common/region/RegionManager.java +++ b/src/main/java/org/tikv/common/region/RegionManager.java @@ -70,7 +70,7 @@ public class RegionManager { private final TiConfiguration conf; private final ScheduledExecutorService executor; private final StoreHealthyChecker storeChecker; - private Function cacheInvalidateCallback; + private final List> cacheInvalidateCallbackList; public RegionManager( TiConfiguration conf, ReadOnlyPDClient pdClient, ChannelFactory channelFactory) { @@ -84,25 +84,7 @@ public RegionManager( this.storeChecker = storeChecker; this.executor = Executors.newScheduledThreadPool(1); this.executor.scheduleAtFixedRate(storeChecker, period, period, TimeUnit.MILLISECONDS); - this.cacheInvalidateCallback = null; - } - - public RegionManager( - TiConfiguration conf, - ReadOnlyPDClient pdClient, - ChannelFactory channelFactory, - Function cacheInvalidateCallBack) { - this.cache = new RegionCache(); - this.pdClient = pdClient; - this.conf = conf; - long period = conf.getHealthCheckPeriodDuration(); - StoreHealthyChecker storeChecker = - new StoreHealthyChecker( - channelFactory, pdClient, this.cache, conf.getGrpcHealthCheckTimeout()); - this.storeChecker = storeChecker; - this.executor = Executors.newScheduledThreadPool(1); - this.executor.scheduleAtFixedRate(storeChecker, period, period, TimeUnit.MILLISECONDS); - this.cacheInvalidateCallback = cacheInvalidateCallBack; + this.cacheInvalidateCallbackList = new ArrayList<>(); } public RegionManager(TiConfiguration conf, ReadOnlyPDClient pdClient) { @@ -111,19 +93,7 @@ public RegionManager(TiConfiguration conf, ReadOnlyPDClient pdClient) { this.conf = conf; this.storeChecker = null; this.executor = null; - this.cacheInvalidateCallback = null; - } - - public RegionManager( - TiConfiguration conf, - ReadOnlyPDClient pdClient, - Function cacheInvalidateCallback) { - this.cache = new RegionCache(); - this.pdClient = pdClient; - this.conf = conf; - this.storeChecker = null; - this.executor = null; - this.cacheInvalidateCallback = cacheInvalidateCallback; + this.cacheInvalidateCallbackList= new ArrayList<>(); } public synchronized void close() { @@ -136,13 +106,13 @@ public ReadOnlyPDClient getPDClient() { return this.pdClient; } - public synchronized Function getCacheInvalidateCallback() { - return cacheInvalidateCallback; + public synchronized List> getCacheInvalidateCallbackList() { + return cacheInvalidateCallbackList; } - public synchronized void setCacheInvalidateCallback( + public synchronized void addCacheInvalidateCallback( Function cacheInvalidateCallback) { - this.cacheInvalidateCallback = cacheInvalidateCallback; + this.cacheInvalidateCallbackList.add(cacheInvalidateCallback); } public void invalidateAll() { diff --git a/src/test/java/org/tikv/common/CacheInvalidCallBackTest.java b/src/test/java/org/tikv/common/CacheInvalidCallBackTest.java index a3a2a683012..c426eff670e 100644 --- a/src/test/java/org/tikv/common/CacheInvalidCallBackTest.java +++ b/src/test/java/org/tikv/common/CacheInvalidCallBackTest.java @@ -52,11 +52,13 @@ private RegionStoreClient createClient( .build(); TiStore store = new TiStore(meta); + RegionManager manager = new RegionManager(session.getConf(), session.getPDClient()); + manager.addCacheInvalidateCallback(cacheInvalidateCallBack); RegionStoreClientBuilder builder = new RegionStoreClientBuilder( session.getConf(), session.getChannelFactory(), - new RegionManager(session.getConf(), session.getPDClient(), cacheInvalidateCallBack), + manager, session.getPDClient()); return builder.build(region, store); From ece0b10ea80ac0f5b92d6e2df4c301ad8dcaf867 Mon Sep 17 00:00:00 2001 From: qidi1 <1083369179@qq.com> Date: Tue, 27 Sep 2022 11:40:22 +0800 Subject: [PATCH 03/13] format code Signed-off-by: qidi1 <1083369179@qq.com> --- .../org/tikv/common/operation/RegionErrorHandler.java | 9 +++++---- src/main/java/org/tikv/common/region/RegionManager.java | 2 +- .../java/org/tikv/common/CacheInvalidCallBackTest.java | 5 +---- 3 files changed, 7 insertions(+), 9 deletions(-) diff --git a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java index 8b506cd7ef2..9da29ea3536 100644 --- a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java +++ b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java @@ -279,7 +279,8 @@ private void invalidateRegionStoreCache(TiRegion ctxRegion, long storeId) { private void notifyRegionStoreCacheInvalidate( long regionId, long storeId, CacheInvalidateEvent.CacheType type) { if (cacheInvalidateCallBackList != null) { - for (Function cacheInvalidateCallBack : cacheInvalidateCallBackList) { + for (Function cacheInvalidateCallBack : + cacheInvalidateCallBackList) { cacheInvalidateCallBack.apply( new CacheInvalidateEvent(regionId, storeId, true, true, type)); } @@ -288,10 +289,10 @@ private void notifyRegionStoreCacheInvalidate( private void notifyRegionCacheInvalidate(long regionId, CacheInvalidateEvent.CacheType type) { if (cacheInvalidateCallBackList != null) { - for (Function cacheInvalidateCallBack : cacheInvalidateCallBackList) { + for (Function cacheInvalidateCallBack : + cacheInvalidateCallBackList) { try { - cacheInvalidateCallBack.apply( - new CacheInvalidateEvent(regionId, 0, true, true, type)); + cacheInvalidateCallBack.apply(new CacheInvalidateEvent(regionId, 0, true, true, type)); } catch (Exception e) { logger.warn(String.format("CacheInvalidCallBack failed %s", e)); } diff --git a/src/main/java/org/tikv/common/region/RegionManager.java b/src/main/java/org/tikv/common/region/RegionManager.java index 8958ae42146..331d96ad478 100644 --- a/src/main/java/org/tikv/common/region/RegionManager.java +++ b/src/main/java/org/tikv/common/region/RegionManager.java @@ -93,7 +93,7 @@ public RegionManager(TiConfiguration conf, ReadOnlyPDClient pdClient) { this.conf = conf; this.storeChecker = null; this.executor = null; - this.cacheInvalidateCallbackList= new ArrayList<>(); + this.cacheInvalidateCallbackList = new ArrayList<>(); } public synchronized void close() { diff --git a/src/test/java/org/tikv/common/CacheInvalidCallBackTest.java b/src/test/java/org/tikv/common/CacheInvalidCallBackTest.java index c426eff670e..c612ca9c12a 100644 --- a/src/test/java/org/tikv/common/CacheInvalidCallBackTest.java +++ b/src/test/java/org/tikv/common/CacheInvalidCallBackTest.java @@ -56,10 +56,7 @@ private RegionStoreClient createClient( manager.addCacheInvalidateCallback(cacheInvalidateCallBack); RegionStoreClientBuilder builder = new RegionStoreClientBuilder( - session.getConf(), - session.getChannelFactory(), - manager, - session.getPDClient()); + session.getConf(), session.getChannelFactory(), manager, session.getPDClient()); return builder.build(region, store); } From 454f689b3e5a8e364f893fdd97f7d32439e7dbbe Mon Sep 17 00:00:00 2001 From: qidi1 <1083369179@qq.com> Date: Tue, 27 Sep 2022 15:25:34 +0800 Subject: [PATCH 04/13] change as comment Signed-off-by: qidi1 <1083369179@qq.com> --- .../common/event/CacheInvalidateEvent.java | 2 + .../common/operation/RegionErrorHandler.java | 67 ++++++++++++------- 2 files changed, 43 insertions(+), 26 deletions(-) diff --git a/src/main/java/org/tikv/common/event/CacheInvalidateEvent.java b/src/main/java/org/tikv/common/event/CacheInvalidateEvent.java index 10d21942c91..ca7d73bac30 100644 --- a/src/main/java/org/tikv/common/event/CacheInvalidateEvent.java +++ b/src/main/java/org/tikv/common/event/CacheInvalidateEvent.java @@ -97,6 +97,8 @@ public String toString() { public enum CacheType implements Serializable { REGION_STORE, + STORE, + REGION, REQ_FAILED, LEADER } diff --git a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java index 9da29ea3536..e073de658c3 100644 --- a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java +++ b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java @@ -110,7 +110,7 @@ public boolean handleRegionError(BackOffer backOffer, Errorpb.Error error) { if (!retry) { this.regionManager.invalidateRegion(recv.getRegion()); - notifyRegionCacheInvalidate(recv.getRegion().getId(), CacheType.LEADER); + notifyRegionLeaderError(recv.getRegion()); } backOffer.doBackOff(backOffFuncType, new GrpcException(error.toString())); @@ -163,7 +163,7 @@ public boolean handleRegionError(BackOffer backOffer, Errorpb.Error error) { // key requested is not in current region // should not happen here. ByteString invalidKey = error.getKeyNotInRegion().getKey(); - notifyRegionCacheInvalidate(recv.getRegion().getId(), CacheType.REGION_STORE); + notifyRegionCacheInvalidate(recv.getRegion()); logger.error( String.format( "Key not in region [%s] for key [%s], this error should not happen here.", @@ -192,7 +192,7 @@ public boolean handleRegionError(BackOffer backOffer, Errorpb.Error error) { private boolean onRegionEpochNotMatch(BackOffer backOffer, List currentRegions) { if (currentRegions.size() == 0) { this.regionManager.onRegionStale(recv.getRegion()); - notifyRegionCacheInvalidate(recv.getRegion().getId(), CacheType.REGION_STORE); + notifyRegionCacheInvalidate(recv.getRegion()); return false; } @@ -226,7 +226,7 @@ private boolean onRegionEpochNotMatch(BackOffer backOffer, List c } if (needInvalidateOld) { - notifyRegionCacheInvalidate(recv.getRegion().getId(), CacheType.REGION_STORE); + notifyRegionCacheInvalidate(recv.getRegion()); this.regionManager.onRegionStale(recv.getRegion()); } @@ -269,34 +269,49 @@ public TiRegion getRegion() { return recv.getRegion(); } + private void notifyRegionRequestError( + TiRegion ctxRegion, long storeId, CacheInvalidateEvent.CacheType type) { + CacheInvalidateEvent event; + switch (type) { + case REGION: + case LEADER: + event = new CacheInvalidateEvent(ctxRegion.getId(), 0, true, false, type); + break; + case REGION_STORE: + event = new CacheInvalidateEvent(ctxRegion.getId(), storeId, true, true, type); + break; + case STORE: + event = new CacheInvalidateEvent(0, storeId, false, true, type); + break; + case REQ_FAILED: + event = new CacheInvalidateEvent(0, 0, false, false, type); + break; + default: + throw new IllegalArgumentException("Unexpect invalid cache invalid type " + type); + } + if (cacheInvalidateCallBackList != null) { + for (Function cacheInvalidateCallBack : + cacheInvalidateCallBackList) { + try { + cacheInvalidateCallBack.apply(event); + } catch (Exception e) { + logger.warn(String.format("CacheInvalidCallBack failed %s", e)); + } + } + } + } + private void invalidateRegionStoreCache(TiRegion ctxRegion, long storeId) { regionManager.invalidateRegion(ctxRegion); regionManager.invalidateStore(storeId); - notifyRegionStoreCacheInvalidate( - ctxRegion.getId(), storeId, CacheInvalidateEvent.CacheType.REGION_STORE); + notifyRegionRequestError(ctxRegion, storeId, CacheInvalidateEvent.CacheType.REGION); } - private void notifyRegionStoreCacheInvalidate( - long regionId, long storeId, CacheInvalidateEvent.CacheType type) { - if (cacheInvalidateCallBackList != null) { - for (Function cacheInvalidateCallBack : - cacheInvalidateCallBackList) { - cacheInvalidateCallBack.apply( - new CacheInvalidateEvent(regionId, storeId, true, true, type)); - } - } + private void notifyRegionCacheInvalidate(TiRegion ctxRegion) { + notifyRegionRequestError(ctxRegion, 0, CacheType.REGION); } - private void notifyRegionCacheInvalidate(long regionId, CacheInvalidateEvent.CacheType type) { - if (cacheInvalidateCallBackList != null) { - for (Function cacheInvalidateCallBack : - cacheInvalidateCallBackList) { - try { - cacheInvalidateCallBack.apply(new CacheInvalidateEvent(regionId, 0, true, true, type)); - } catch (Exception e) { - logger.warn(String.format("CacheInvalidCallBack failed %s", e)); - } - } - } + private void notifyRegionLeaderError(TiRegion ctxRegion) { + notifyRegionRequestError(ctxRegion, 0, CacheType.LEADER); } } From 5ef3118edffa1b93271d60738f701b3fe7aea79c Mon Sep 17 00:00:00 2001 From: qidi1 <1083369179@qq.com> Date: Tue, 27 Sep 2022 20:47:39 +0800 Subject: [PATCH 05/13] change to synchronized Signed-off-by: qidi1 <1083369179@qq.com> --- .../org/tikv/common/operation/RegionErrorHandler.java | 10 ++++------ .../java/org/tikv/common/region/RegionManager.java | 7 ++++--- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java index e073de658c3..a06401eb0b5 100644 --- a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java +++ b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java @@ -120,7 +120,7 @@ public boolean handleRegionError(BackOffer backOffer, Errorpb.Error error) { // this error is reported from raftstore: // store_id requested at the moment is inconsistent with that expected // Solution:re-fetch from PD - long storeId = recv.getRegion().getLeader().getStoreId(); + long storeId = error.getStoreNotMatch().getRequestStoreId(); long actualStoreId = error.getStoreNotMatch().getActualStoreId(); logger.warn( String.format( @@ -163,7 +163,6 @@ public boolean handleRegionError(BackOffer backOffer, Errorpb.Error error) { // key requested is not in current region // should not happen here. ByteString invalidKey = error.getKeyNotInRegion().getKey(); - notifyRegionCacheInvalidate(recv.getRegion()); logger.error( String.format( "Key not in region [%s] for key [%s], this error should not happen here.", @@ -280,9 +279,6 @@ private void notifyRegionRequestError( case REGION_STORE: event = new CacheInvalidateEvent(ctxRegion.getId(), storeId, true, true, type); break; - case STORE: - event = new CacheInvalidateEvent(0, storeId, false, true, type); - break; case REQ_FAILED: event = new CacheInvalidateEvent(0, 0, false, false, type); break; @@ -290,6 +286,7 @@ private void notifyRegionRequestError( throw new IllegalArgumentException("Unexpect invalid cache invalid type " + type); } if (cacheInvalidateCallBackList != null) { + new Thread(() -> { for (Function cacheInvalidateCallBack : cacheInvalidateCallBackList) { try { @@ -298,13 +295,14 @@ private void notifyRegionRequestError( logger.warn(String.format("CacheInvalidCallBack failed %s", e)); } } + }).start(); } } private void invalidateRegionStoreCache(TiRegion ctxRegion, long storeId) { regionManager.invalidateRegion(ctxRegion); regionManager.invalidateStore(storeId); - notifyRegionRequestError(ctxRegion, storeId, CacheInvalidateEvent.CacheType.REGION); + notifyRegionRequestError(ctxRegion, storeId, CacheType.REGION_STORE); } private void notifyRegionCacheInvalidate(TiRegion ctxRegion) { diff --git a/src/main/java/org/tikv/common/region/RegionManager.java b/src/main/java/org/tikv/common/region/RegionManager.java index 331d96ad478..a6d7e7fbf7c 100644 --- a/src/main/java/org/tikv/common/region/RegionManager.java +++ b/src/main/java/org/tikv/common/region/RegionManager.java @@ -23,6 +23,7 @@ import io.prometheus.client.Histogram; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -93,7 +94,7 @@ public RegionManager(TiConfiguration conf, ReadOnlyPDClient pdClient) { this.conf = conf; this.storeChecker = null; this.executor = null; - this.cacheInvalidateCallbackList = new ArrayList<>(); + this.cacheInvalidateCallbackList = new CopyOnWriteArrayList<>(); } public synchronized void close() { @@ -106,11 +107,11 @@ public ReadOnlyPDClient getPDClient() { return this.pdClient; } - public synchronized List> getCacheInvalidateCallbackList() { + public List> getCacheInvalidateCallbackList() { return cacheInvalidateCallbackList; } - public synchronized void addCacheInvalidateCallback( + public void addCacheInvalidateCallback( Function cacheInvalidateCallback) { this.cacheInvalidateCallbackList.add(cacheInvalidateCallback); } From a49125ca79d7ebb33af01df95993979944c5fa83 Mon Sep 17 00:00:00 2001 From: qidi1 <1083369179@qq.com> Date: Wed, 28 Sep 2022 11:40:22 +0800 Subject: [PATCH 06/13] change list to copy on write Signed-off-by: qidi1 <1083369179@qq.com> --- src/main/java/org/tikv/common/region/RegionManager.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/tikv/common/region/RegionManager.java b/src/main/java/org/tikv/common/region/RegionManager.java index a6d7e7fbf7c..0f66db4d4e3 100644 --- a/src/main/java/org/tikv/common/region/RegionManager.java +++ b/src/main/java/org/tikv/common/region/RegionManager.java @@ -71,7 +71,7 @@ public class RegionManager { private final TiConfiguration conf; private final ScheduledExecutorService executor; private final StoreHealthyChecker storeChecker; - private final List> cacheInvalidateCallbackList; + private final CopyOnWriteArrayList> cacheInvalidateCallbackList; public RegionManager( TiConfiguration conf, ReadOnlyPDClient pdClient, ChannelFactory channelFactory) { @@ -85,7 +85,7 @@ public RegionManager( this.storeChecker = storeChecker; this.executor = Executors.newScheduledThreadPool(1); this.executor.scheduleAtFixedRate(storeChecker, period, period, TimeUnit.MILLISECONDS); - this.cacheInvalidateCallbackList = new ArrayList<>(); + this.cacheInvalidateCallbackList = new CopyOnWriteArrayList<>(); } public RegionManager(TiConfiguration conf, ReadOnlyPDClient pdClient) { From 922de9b43fcfc8712bd5f7881bc2f9e6678fc683 Mon Sep 17 00:00:00 2001 From: qidi1 <1083369179@qq.com> Date: Wed, 28 Sep 2022 16:54:03 +0800 Subject: [PATCH 07/13] change to muti thread Signed-off-by: qidi1 <1083369179@qq.com> --- .../common/operation/RegionErrorHandler.java | 14 ++++--- .../org/tikv/common/region/RegionManager.java | 39 ++++++++++++++----- .../tikv/common/CacheInvalidCallBackTest.java | 11 +++++- 3 files changed, 49 insertions(+), 15 deletions(-) diff --git a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java index a06401eb0b5..ba9ca09efda 100644 --- a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java +++ b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java @@ -21,6 +21,7 @@ import io.grpc.StatusRuntimeException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ExecutorService; import java.util.function.Function; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,6 +47,7 @@ public class RegionErrorHandler implements ErrorHandler { private final RegionErrorReceiver recv; private final List> cacheInvalidateCallBackList; + private final ExecutorService callBackThreadPool; public RegionErrorHandler( RegionManager regionManager, RegionErrorReceiver recv, @@ -54,6 +56,7 @@ public RegionErrorHandler( this.regionManager = regionManager; this.getRegionError = getRegionError; this.cacheInvalidateCallBackList = regionManager.getCacheInvalidateCallbackList(); + this.callBackThreadPool = regionManager.getCallBackThreadPool(); } @Override @@ -286,16 +289,17 @@ private void notifyRegionRequestError( throw new IllegalArgumentException("Unexpect invalid cache invalid type " + type); } if (cacheInvalidateCallBackList != null) { - new Thread(() -> { - for (Function cacheInvalidateCallBack : - cacheInvalidateCallBackList) { + for (Function cacheInvalidateCallBack : + cacheInvalidateCallBackList) { + callBackThreadPool.submit(() -> { try { cacheInvalidateCallBack.apply(event); } catch (Exception e) { logger.warn(String.format("CacheInvalidCallBack failed %s", e)); } - } - }).start(); + }); + + } } } diff --git a/src/main/java/org/tikv/common/region/RegionManager.java b/src/main/java/org/tikv/common/region/RegionManager.java index 0f66db4d4e3..4c5feeb369e 100644 --- a/src/main/java/org/tikv/common/region/RegionManager.java +++ b/src/main/java/org/tikv/common/region/RegionManager.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -73,8 +74,31 @@ public class RegionManager { private final StoreHealthyChecker storeChecker; private final CopyOnWriteArrayList> cacheInvalidateCallbackList; + private final ExecutorService callBackThreadPool; + public RegionManager( TiConfiguration conf, ReadOnlyPDClient pdClient, ChannelFactory channelFactory) { + this(conf, pdClient, channelFactory, 1); + } + + public RegionManager(TiConfiguration conf, ReadOnlyPDClient pdClient) { + this(conf, pdClient, 1); + } + + public RegionManager(TiConfiguration conf, ReadOnlyPDClient pdClient, + int callBackExecutorThreadNum) { + this.cache = new RegionCache(); + this.pdClient = pdClient; + this.conf = conf; + this.storeChecker = null; + this.executor = null; + this.cacheInvalidateCallbackList = new CopyOnWriteArrayList<>(); + this.callBackThreadPool = Executors.newFixedThreadPool(callBackExecutorThreadNum); + } + + public RegionManager( + TiConfiguration conf, ReadOnlyPDClient pdClient, ChannelFactory channelFactory, + int callBackExecutorThreadNum) { this.cache = new RegionCache(); this.pdClient = pdClient; this.conf = conf; @@ -86,27 +110,24 @@ public RegionManager( this.executor = Executors.newScheduledThreadPool(1); this.executor.scheduleAtFixedRate(storeChecker, period, period, TimeUnit.MILLISECONDS); this.cacheInvalidateCallbackList = new CopyOnWriteArrayList<>(); - } - - public RegionManager(TiConfiguration conf, ReadOnlyPDClient pdClient) { - this.cache = new RegionCache(); - this.pdClient = pdClient; - this.conf = conf; - this.storeChecker = null; - this.executor = null; - this.cacheInvalidateCallbackList = new CopyOnWriteArrayList<>(); + this.callBackThreadPool = Executors.newFixedThreadPool(callBackExecutorThreadNum); } public synchronized void close() { if (this.executor != null) { this.executor.shutdownNow(); } + this.callBackThreadPool.shutdownNow(); } public ReadOnlyPDClient getPDClient() { return this.pdClient; } + public ExecutorService getCallBackThreadPool() { + return callBackThreadPool; + } + public List> getCacheInvalidateCallbackList() { return cacheInvalidateCallbackList; } diff --git a/src/test/java/org/tikv/common/CacheInvalidCallBackTest.java b/src/test/java/org/tikv/common/CacheInvalidCallBackTest.java index c612ca9c12a..5e4f0a992a0 100644 --- a/src/test/java/org/tikv/common/CacheInvalidCallBackTest.java +++ b/src/test/java/org/tikv/common/CacheInvalidCallBackTest.java @@ -81,7 +81,6 @@ public void doRawGetTest( } catch (Exception e) { assertEquals(1, cacheInvalidateCallBack.cacheInvalidateEvents.size()); } - server.putError( "failure", () -> Errorpb.Error.newBuilder().setEpochNotMatch(EpochNotMatch.getDefaultInstance())); @@ -89,6 +88,7 @@ public void doRawGetTest( client.rawGet(defaultBackOff(), ByteString.copyFromUtf8("failure")); fail(); } catch (Exception e) { + sleep(1000); assertEquals(2, cacheInvalidateCallBack.cacheInvalidateEvents.size()); } server.putError( @@ -98,12 +98,21 @@ public void doRawGetTest( client.rawGet(defaultBackOff(), ByteString.copyFromUtf8("failure")); fail(); } catch (Exception e) { + sleep(1000); assertEquals(3, cacheInvalidateCallBack.cacheInvalidateEvents.size()); } server.clearAllMap(); client.close(); } + private void sleep(int time) { + try { + Thread.sleep(time); + } catch (InterruptedException e) { + fail(); + } + } + private BackOffer defaultBackOff() { return ConcreteBackOffer.newCustomBackOff(1000); } From a8fe9c357a12799ab5a396fea76c0d7439742143 Mon Sep 17 00:00:00 2001 From: qidi1 <1083369179@qq.com> Date: Wed, 28 Sep 2022 16:58:18 +0800 Subject: [PATCH 08/13] format code Signed-off-by: qidi1 <1083369179@qq.com> --- .../common/operation/RegionErrorHandler.java | 17 +++++++++-------- .../org/tikv/common/region/RegionManager.java | 11 +++++++---- 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java index ba9ca09efda..104069fb5a8 100644 --- a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java +++ b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java @@ -48,6 +48,7 @@ public class RegionErrorHandler implements ErrorHandler { private final List> cacheInvalidateCallBackList; private final ExecutorService callBackThreadPool; + public RegionErrorHandler( RegionManager regionManager, RegionErrorReceiver recv, @@ -291,14 +292,14 @@ private void notifyRegionRequestError( if (cacheInvalidateCallBackList != null) { for (Function cacheInvalidateCallBack : cacheInvalidateCallBackList) { - callBackThreadPool.submit(() -> { - try { - cacheInvalidateCallBack.apply(event); - } catch (Exception e) { - logger.warn(String.format("CacheInvalidCallBack failed %s", e)); - } - }); - + callBackThreadPool.submit( + () -> { + try { + cacheInvalidateCallBack.apply(event); + } catch (Exception e) { + logger.warn(String.format("CacheInvalidCallBack failed %s", e)); + } + }); } } } diff --git a/src/main/java/org/tikv/common/region/RegionManager.java b/src/main/java/org/tikv/common/region/RegionManager.java index 4c5feeb369e..3d4eefae575 100644 --- a/src/main/java/org/tikv/common/region/RegionManager.java +++ b/src/main/java/org/tikv/common/region/RegionManager.java @@ -72,7 +72,8 @@ public class RegionManager { private final TiConfiguration conf; private final ScheduledExecutorService executor; private final StoreHealthyChecker storeChecker; - private final CopyOnWriteArrayList> cacheInvalidateCallbackList; + private final CopyOnWriteArrayList> + cacheInvalidateCallbackList; private final ExecutorService callBackThreadPool; @@ -85,8 +86,8 @@ public RegionManager(TiConfiguration conf, ReadOnlyPDClient pdClient) { this(conf, pdClient, 1); } - public RegionManager(TiConfiguration conf, ReadOnlyPDClient pdClient, - int callBackExecutorThreadNum) { + public RegionManager( + TiConfiguration conf, ReadOnlyPDClient pdClient, int callBackExecutorThreadNum) { this.cache = new RegionCache(); this.pdClient = pdClient; this.conf = conf; @@ -97,7 +98,9 @@ public RegionManager(TiConfiguration conf, ReadOnlyPDClient pdClient, } public RegionManager( - TiConfiguration conf, ReadOnlyPDClient pdClient, ChannelFactory channelFactory, + TiConfiguration conf, + ReadOnlyPDClient pdClient, + ChannelFactory channelFactory, int callBackExecutorThreadNum) { this.cache = new RegionCache(); this.pdClient = pdClient; From 1fd41b6b3d9b320fd7eef8b5b682a75847668861 Mon Sep 17 00:00:00 2001 From: qidi1 <1083369179@qq.com> Date: Wed, 28 Sep 2022 17:03:32 +0800 Subject: [PATCH 09/13] add comment Signed-off-by: qidi1 <1083369179@qq.com> --- src/main/java/org/tikv/common/operation/RegionErrorHandler.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java index 104069fb5a8..f9f03a6f835 100644 --- a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java +++ b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java @@ -275,6 +275,7 @@ public TiRegion getRegion() { private void notifyRegionRequestError( TiRegion ctxRegion, long storeId, CacheInvalidateEvent.CacheType type) { CacheInvalidateEvent event; + // When store(region) id is 0, it implies that the error was not caused by store(region). switch (type) { case REGION: case LEADER: From be66047da96bd49bab3f68417b8926b4eb1c3d61 Mon Sep 17 00:00:00 2001 From: qidi1 <1083369179@qq.com> Date: Wed, 28 Sep 2022 22:40:36 +0800 Subject: [PATCH 10/13] change to magical num Signed-off-by: qidi1 <1083369179@qq.com> --- .../java/org/tikv/common/operation/RegionErrorHandler.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java index f9f03a6f835..7dd2782f042 100644 --- a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java +++ b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java @@ -48,6 +48,8 @@ public class RegionErrorHandler implements ErrorHandler { private final List> cacheInvalidateCallBackList; private final ExecutorService callBackThreadPool; + private final int INVALID_STORE_ID = 0; + private final int INVALID_REGION_ID = 0; public RegionErrorHandler( RegionManager regionManager, @@ -279,13 +281,13 @@ private void notifyRegionRequestError( switch (type) { case REGION: case LEADER: - event = new CacheInvalidateEvent(ctxRegion.getId(), 0, true, false, type); + event = new CacheInvalidateEvent(ctxRegion.getId(), INVALID_STORE_ID, true, false, type); break; case REGION_STORE: event = new CacheInvalidateEvent(ctxRegion.getId(), storeId, true, true, type); break; case REQ_FAILED: - event = new CacheInvalidateEvent(0, 0, false, false, type); + event = new CacheInvalidateEvent(INVALID_REGION_ID, INVALID_STORE_ID, false, false, type); break; default: throw new IllegalArgumentException("Unexpect invalid cache invalid type " + type); @@ -299,6 +301,7 @@ private void notifyRegionRequestError( cacheInvalidateCallBack.apply(event); } catch (Exception e) { logger.warn(String.format("CacheInvalidCallBack failed %s", e)); + throw e; } }); } From b2db42ec867525873cb2cd93737a1aa9458ac128 Mon Sep 17 00:00:00 2001 From: qidi1 <1083369179@qq.com> Date: Thu, 29 Sep 2022 09:39:20 +0800 Subject: [PATCH 11/13] add comment Signed-off-by: qidi1 <1083369179@qq.com> --- .../java/org/tikv/common/operation/RegionErrorHandler.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java index 7dd2782f042..b45d238f97c 100644 --- a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java +++ b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java @@ -277,7 +277,8 @@ public TiRegion getRegion() { private void notifyRegionRequestError( TiRegion ctxRegion, long storeId, CacheInvalidateEvent.CacheType type) { CacheInvalidateEvent event; - // When store(region) id is 0, it implies that the error was not caused by store(region). + // When store(region) id is invalid, + // it implies that the error was not caused by store(region) error. switch (type) { case REGION: case LEADER: From a0e00fc85dff829b59d6726f1c1541c0328745ab Mon Sep 17 00:00:00 2001 From: qidi1 <1083369179@qq.com> Date: Thu, 29 Sep 2022 12:59:06 +0800 Subject: [PATCH 12/13] change log levle Signed-off-by: qidi1 <1083369179@qq.com> --- .../java/org/tikv/common/operation/RegionErrorHandler.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java index b45d238f97c..5eef2759d79 100644 --- a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java +++ b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java @@ -301,8 +301,7 @@ private void notifyRegionRequestError( try { cacheInvalidateCallBack.apply(event); } catch (Exception e) { - logger.warn(String.format("CacheInvalidCallBack failed %s", e)); - throw e; + logger.error(String.format("CacheInvalidCallBack failed %s", e)); } }); } From 21354fa9e32b929f32e7cc3463ef811358e5a996 Mon Sep 17 00:00:00 2001 From: shi yuhang <52435083+shiyuhang0@users.noreply.github.com> Date: Sun, 29 Jan 2023 11:15:33 +0800 Subject: [PATCH 13/13] Fmt --- src/main/java/org/tikv/common/region/RegionManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/tikv/common/region/RegionManager.java b/src/main/java/org/tikv/common/region/RegionManager.java index 6951d129945..37c3d73f759 100644 --- a/src/main/java/org/tikv/common/region/RegionManager.java +++ b/src/main/java/org/tikv/common/region/RegionManager.java @@ -28,8 +28,8 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.function.Function; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.tikv.common.ReadOnlyPDClient;