diff --git a/src/main/java/org/tikv/common/ConfigUtils.java b/src/main/java/org/tikv/common/ConfigUtils.java index 5938d56a396..8a2f8e3c99d 100644 --- a/src/main/java/org/tikv/common/ConfigUtils.java +++ b/src/main/java/org/tikv/common/ConfigUtils.java @@ -51,14 +51,17 @@ public class ConfigUtils { public static final String TIKV_NETWORK_MAPPING_NAME = "tikv.network.mapping"; public static final String TIKV_ENABLE_GRPC_FORWARD = "tikv.enable_grpc_forward"; public static final String TIKV_GRPC_HEALTH_CHECK_TIMEOUT = "tikv.grpc.health_check_timeout"; + public static final String TIKV_HEALTH_CHECK_PERIOD_DURATION = + "tikv.health_check_period_duration"; public static final String TIKV_ENABLE_ATOMIC_FOR_CAS = "tikv.enable_atomic_for_cas"; public static final String DEF_PD_ADDRESSES = "127.0.0.1:2379"; - public static final String DEF_TIMEOUT = "300ms"; - public static final String DEF_FORWARD_TIMEOUT = "600ms"; + public static final String DEF_TIMEOUT = "200ms"; + public static final String DEF_FORWARD_TIMEOUT = "300ms"; public static final String DEF_SCAN_TIMEOUT = "20s"; public static final int DEF_CHECK_HEALTH_TIMEOUT = 100; + public static final int DEF_HEALTH_CHECK_PERIOD_DURATION = 300; public static final int DEF_SCAN_BATCH_SIZE = 10240; public static final int DEF_MAX_FRAME_SIZE = 268435456 * 2; // 256 * 2 MB public static final int DEF_INDEX_SCAN_BATCH_SIZE = 20000; diff --git a/src/main/java/org/tikv/common/TiConfiguration.java b/src/main/java/org/tikv/common/TiConfiguration.java index 4ee8eac9c54..5337c1e7618 100644 --- a/src/main/java/org/tikv/common/TiConfiguration.java +++ b/src/main/java/org/tikv/common/TiConfiguration.java @@ -80,6 +80,7 @@ private static void loadFromDefaultProperties() { setIfMissing(TIKV_NETWORK_MAPPING_NAME, DEF_TIKV_NETWORK_MAPPING_NAME); setIfMissing(TIKV_ENABLE_GRPC_FORWARD, DEF_GRPC_FORWARD_ENABLE); setIfMissing(TIKV_GRPC_HEALTH_CHECK_TIMEOUT, DEF_CHECK_HEALTH_TIMEOUT); + setIfMissing(TIKV_HEALTH_CHECK_PERIOD_DURATION, DEF_HEALTH_CHECK_PERIOD_DURATION); setIfMissing(TIKV_ENABLE_ATOMIC_FOR_CAS, DEF_TIKV_ENABLE_ATOMIC_FOR_CAS); } @@ -265,6 +266,7 @@ private static ReplicaRead getReplicaRead(String key) { private boolean metricsEnable = getBoolean(TIKV_METRICS_ENABLE); private int metricsPort = getInt(TIKV_METRICS_PORT); private int grpcHealthCheckTimeout = getInt(TIKV_GRPC_HEALTH_CHECK_TIMEOUT); + private int healthCheckPeriodDuration = getInt(TIKV_HEALTH_CHECK_PERIOD_DURATION); private final String networkMappingName = get(TIKV_NETWORK_MAPPING_NAME); private HostMapping hostMapping = null; @@ -573,6 +575,10 @@ public long getGrpcHealthCheckTimeout() { return this.grpcHealthCheckTimeout; } + public long getHealthCheckPeriodDuration() { + return this.healthCheckPeriodDuration; + } + public boolean isEnableAtomicForCAS() { return enableAtomicForCAS; } diff --git a/src/main/java/org/tikv/common/TiSession.java b/src/main/java/org/tikv/common/TiSession.java index a7b6c543085..2bf1e8a94a7 100644 --- a/src/main/java/org/tikv/common/TiSession.java +++ b/src/main/java/org/tikv/common/TiSession.java @@ -26,12 +26,10 @@ import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.function.Function; import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.tikv.common.catalog.Catalog; -import org.tikv.common.event.CacheInvalidateEvent; import org.tikv.common.exception.TiKVException; import org.tikv.common.key.Key; import org.tikv.common.meta.TiTimestamp; @@ -56,7 +54,6 @@ public class TiSession implements AutoCloseable { private static final Map sessionCachedMap = new HashMap<>(); private final TiConfiguration conf; private final ChannelFactory channelFactory; - private Function cacheInvalidateCallback; // below object creation is either heavy or making connection (pd), pending for lazy loading private volatile PDClient client; private volatile Catalog catalog; @@ -182,13 +179,7 @@ public RegionManager getRegionManager() { if (res == null) { synchronized (this) { if (regionManager == null) { - regionManager = - new RegionManager( - getConf(), - getPDClient(), - this.cacheInvalidateCallback, - this.channelFactory, - this.enableGrpcForward); + regionManager = new RegionManager(getConf(), getPDClient(), this.channelFactory); } res = regionManager; } @@ -331,15 +322,6 @@ public ChannelFactory getChannelFactory() { return channelFactory; } - /** - * This is used for setting call back function to invalidate cache information - * - * @param callBackFunc callback function - */ - public void injectCallBackFunc(Function callBackFunc) { - this.cacheInvalidateCallback = callBackFunc; - } - /** * split region and scatter * diff --git a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java index 9edb94c0f46..c47bb854746 100644 --- a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java @@ -125,6 +125,14 @@ public boolean onNotLeader(TiRegion newRegion) { @Override public boolean onStoreUnreachable() { + if (!targetStore.isValid()) { + logger.warn( + String.format("store [%d] has been invalid", region.getId(), targetStore.getId())); + targetStore = regionManager.getStoreById(targetStore.getId()); + updateClientStub(); + return true; + } + if (targetStore.getProxyStore() == null) { if (targetStore.isReachable()) { return true; @@ -238,20 +246,22 @@ private boolean retryOtherStoreLeader() { private void updateClientStub() { String addressStr = targetStore.getStore().getAddress(); + if (targetStore.getProxyStore() != null) { + addressStr = targetStore.getProxyStore().getAddress(); + } ManagedChannel channel = channelFactory.getChannel(addressStr, regionManager.getPDClient().getHostMapping()); blockingStub = TikvGrpc.newBlockingStub(channel); asyncStub = TikvGrpc.newStub(channel); + if (targetStore.getProxyStore() != null) { + Metadata header = new Metadata(); + header.put(TiConfiguration.FORWARD_META_DATA_KEY, targetStore.getStore().getAddress()); + blockingStub = MetadataUtils.attachHeaders(blockingStub, header); + asyncStub = MetadataUtils.attachHeaders(asyncStub, header); + } } private boolean retryOtherStoreByProxyForward() { - if (!targetStore.isValid()) { - targetStore = regionManager.getStoreById(targetStore.getId()); - logger.warn( - String.format("store [%d] has been invalid", region.getId(), targetStore.getId())); - return true; - } - TiStore proxyStore = switchProxyStore(); if (proxyStore == null) { logger.warn( @@ -268,19 +278,13 @@ private boolean retryOtherStoreByProxyForward() { } targetStore = proxyStore; retryForwardTimes += 1; + updateClientStub(); logger.warn( String.format( "forward request to store [%s] by store [%s] for region[%d]", targetStore.getStore().getAddress(), targetStore.getProxyStore().getAddress(), region.getId())); - String addressStr = targetStore.getProxyStore().getAddress(); - ManagedChannel channel = - channelFactory.getChannel(addressStr, regionManager.getPDClient().getHostMapping()); - Metadata header = new Metadata(); - header.put(TiConfiguration.FORWARD_META_DATA_KEY, targetStore.getStore().getAddress()); - blockingStub = MetadataUtils.attachHeaders(TikvGrpc.newBlockingStub(channel), header); - asyncStub = MetadataUtils.attachHeaders(TikvGrpc.newStub(channel), header); return true; } diff --git a/src/main/java/org/tikv/common/region/RegionManager.java b/src/main/java/org/tikv/common/region/RegionManager.java index 5aad400ed80..36a3cc2474a 100644 --- a/src/main/java/org/tikv/common/region/RegionManager.java +++ b/src/main/java/org/tikv/common/region/RegionManager.java @@ -25,13 +25,11 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.function.Function; import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.tikv.common.ReadOnlyPDClient; import org.tikv.common.TiConfiguration; -import org.tikv.common.event.CacheInvalidateEvent; import org.tikv.common.exception.GrpcException; import org.tikv.common.exception.TiClientInternalException; import org.tikv.common.util.BackOffer; @@ -59,50 +57,24 @@ public class RegionManager { private final ScheduledExecutorService executor; private final StoreHealthyChecker storeChecker; - private final Function cacheInvalidateCallback; - - // To avoid double retrieval, we used the async version of grpc - // When rpc not returned, instead of call again, it wait for previous one done - public RegionManager( - TiConfiguration conf, - ReadOnlyPDClient pdClient, - Function cacheInvalidateCallback) { - this.cache = new RegionCache(); - this.pdClient = pdClient; - this.conf = conf; - this.cacheInvalidateCallback = cacheInvalidateCallback; - this.executor = null; - this.storeChecker = null; - } - public RegionManager( - TiConfiguration conf, - ReadOnlyPDClient pdClient, - Function cacheInvalidateCallback, - ChannelFactory channelFactory, - boolean enableGrpcForward) { + TiConfiguration conf, ReadOnlyPDClient pdClient, ChannelFactory channelFactory) { this.cache = new RegionCache(); - this.cacheInvalidateCallback = cacheInvalidateCallback; this.pdClient = pdClient; this.conf = conf; - - if (enableGrpcForward) { - StoreHealthyChecker storeChecker = - new StoreHealthyChecker(channelFactory, pdClient, this.cache); - this.storeChecker = storeChecker; - this.executor = Executors.newScheduledThreadPool(1); - this.executor.scheduleAtFixedRate(storeChecker, 1, 1, TimeUnit.SECONDS); - } else { - this.storeChecker = null; - this.executor = null; - } + long period = conf.getHealthCheckPeriodDuration(); + StoreHealthyChecker storeChecker = + new StoreHealthyChecker( + channelFactory, pdClient, this.cache, conf.getGrpcHealthCheckTimeout()); + this.storeChecker = storeChecker; + this.executor = Executors.newScheduledThreadPool(1); + this.executor.scheduleAtFixedRate(storeChecker, period, period, TimeUnit.MILLISECONDS); } public RegionManager(TiConfiguration conf, ReadOnlyPDClient pdClient) { this.cache = new RegionCache(); this.pdClient = pdClient; this.conf = conf; - this.cacheInvalidateCallback = null; this.storeChecker = null; this.executor = null; } @@ -113,10 +85,6 @@ public synchronized void close() { } } - public Function getCacheInvalidateCallback() { - return cacheInvalidateCallback; - } - public ReadOnlyPDClient getPDClient() { return this.pdClient; } @@ -234,7 +202,7 @@ public TiStore getStoreById(long id, BackOffer backOffer) { if (store.getStore().getState().equals(StoreState.Tombstone)) { return null; } - if (cache.putStore(id, store)) { + if (cache.putStore(id, store) && storeChecker != null) { storeChecker.scheduleStoreHealthCheck(store); } return store; @@ -266,7 +234,7 @@ public TiRegion updateLeader(TiRegion region, long storeId) { } public synchronized void updateStore(TiStore oldStore, TiStore newStore) { - if (cache.updateStore(oldStore, newStore)) { + if (cache.updateStore(oldStore, newStore) && storeChecker != null) { storeChecker.scheduleStoreHealthCheck(newStore); } } @@ -285,24 +253,6 @@ public synchronized void onRequestFail(TiRegion region) { cache.invalidateRegion(region); } - /** If region has changed, return the new one and update cache. */ - public TiRegion getRegionSkipCache(TiRegion region) { - BackOffer backOffer = ConcreteBackOffer.newGetBackOff(); - try { - Pair regionAndLeader = - pdClient.getRegionByID(backOffer, region.getId()); - if (!regionAndLeader.first.equals(region.getMeta())) { - region = createRegion(regionAndLeader.first, regionAndLeader.second, backOffer); - return cache.putRegion(region); - } else { - logger.warn("Cannot get region from PD for region id: " + region.getId()); - return null; - } - } catch (Exception e) { - return null; - } - } - public void invalidateStore(long storeId) { cache.invalidateStore(storeId); } diff --git a/src/main/java/org/tikv/common/region/StoreHealthyChecker.java b/src/main/java/org/tikv/common/region/StoreHealthyChecker.java index 352725f5fed..fbc75cb534f 100644 --- a/src/main/java/org/tikv/common/region/StoreHealthyChecker.java +++ b/src/main/java/org/tikv/common/region/StoreHealthyChecker.java @@ -19,20 +19,21 @@ public class StoreHealthyChecker implements Runnable { private static final Logger logger = LoggerFactory.getLogger(StoreHealthyChecker.class); private static final long MAX_CHECK_STORE_TOMBSTONE_TICK = 60; - private static final long SLEEP_MILLI_SECONDS_AFTER_DOUBLE_CHECK = 500; private BlockingQueue taskQueue; private final ChannelFactory channelFactory; private final ReadOnlyPDClient pdClient; private final RegionCache cache; private long checkTombstoneTick; + private long timeout; public StoreHealthyChecker( - ChannelFactory channelFactory, ReadOnlyPDClient pdClient, RegionCache cache) { + ChannelFactory channelFactory, ReadOnlyPDClient pdClient, RegionCache cache, long timeout) { this.taskQueue = new LinkedBlockingQueue<>(); this.channelFactory = channelFactory; this.pdClient = pdClient; this.cache = cache; this.checkTombstoneTick = 0; + this.timeout = timeout; } public boolean scheduleStoreHealthCheck(TiStore store) { @@ -64,7 +65,7 @@ private boolean checkStoreHealth(TiStore store) { try { ManagedChannel channel = channelFactory.getChannel(addressStr, pdClient.getHostMapping()); HealthGrpc.HealthBlockingStub stub = - HealthGrpc.newBlockingStub(channel).withDeadlineAfter(200, TimeUnit.MILLISECONDS); + HealthGrpc.newBlockingStub(channel).withDeadlineAfter(timeout, TimeUnit.MILLISECONDS); HealthCheckRequest req = HealthCheckRequest.newBuilder().build(); HealthCheckResponse resp = stub.check(req); if (resp.getStatus() == HealthCheckResponse.ServingStatus.SERVING) { @@ -133,7 +134,7 @@ public void run() { } if (!unreachableStore.isEmpty()) { try { - Thread.sleep(SLEEP_MILLI_SECONDS_AFTER_DOUBLE_CHECK); + Thread.sleep(timeout); } catch (Exception e) { this.taskQueue.addAll(unreachableStore); return; diff --git a/src/test/java/org/tikv/common/RegionManagerTest.java b/src/test/java/org/tikv/common/RegionManagerTest.java index 003830c3eb6..b808bf6325f 100644 --- a/src/test/java/org/tikv/common/RegionManagerTest.java +++ b/src/test/java/org/tikv/common/RegionManagerTest.java @@ -61,7 +61,7 @@ public void getRegionByKey() throws Exception { int confVer = 1026; int ver = 1027; long regionId = 233; - String testAddress = "testAddress"; + String testAddress = "127.0.0.1"; pdServer.addGetRegionResp( GrpcUtils.makeGetRegionResponse( pdServer.getClusterId(), @@ -92,11 +92,7 @@ public void getRegionByKey() throws Exception { // This will in turn invoke rpc and results in an error // since we set just one rpc response - try { - mgr.getRegionByKey(searchKeyNotExists); - fail(); - } catch (Exception ignored) { - } + assertNull(mgr.getRegionByKey(searchKeyNotExists)); } @Test