diff --git a/src/main/java/org/tikv/common/ConfigUtils.java b/src/main/java/org/tikv/common/ConfigUtils.java index 9d1772f8013..e7c497cb015 100644 --- a/src/main/java/org/tikv/common/ConfigUtils.java +++ b/src/main/java/org/tikv/common/ConfigUtils.java @@ -51,12 +51,15 @@ public class ConfigUtils { public static final String TIKV_NETWORK_MAPPING_NAME = "tikv.network.mapping"; public static final String TIKV_ENABLE_GRPC_FORWARD = "tikv.enable_grpc_forward"; public static final String TIKV_GRPC_HEALTH_CHECK_TIMEOUT = "tikv.grpc.health_check_timeout"; + public static final String TIKV_HEALTH_CHECK_PERIOD_DURATION = + "tikv.health_check_period_duration"; public static final String DEF_PD_ADDRESSES = "127.0.0.1:2379"; - public static final String DEF_TIMEOUT = "300ms"; - public static final String DEF_FORWARD_TIMEOUT = "600ms"; + public static final String DEF_TIMEOUT = "200ms"; + public static final String DEF_FORWARD_TIMEOUT = "300ms"; public static final String DEF_SCAN_TIMEOUT = "20s"; public static final int DEF_CHECK_HEALTH_TIMEOUT = 100; + public static final int DEF_HEALTH_CHECK_PERIOD_DURATION = 300; public static final int DEF_SCAN_BATCH_SIZE = 10240; public static final int DEF_MAX_FRAME_SIZE = 268435456 * 2; // 256 * 2 MB public static final int DEF_INDEX_SCAN_BATCH_SIZE = 20000; diff --git a/src/main/java/org/tikv/common/TiConfiguration.java b/src/main/java/org/tikv/common/TiConfiguration.java index 4712d6f1304..578086a53f1 100644 --- a/src/main/java/org/tikv/common/TiConfiguration.java +++ b/src/main/java/org/tikv/common/TiConfiguration.java @@ -80,6 +80,7 @@ private static void loadFromDefaultProperties() { setIfMissing(TIKV_NETWORK_MAPPING_NAME, DEF_TIKV_NETWORK_MAPPING_NAME); setIfMissing(TIKV_ENABLE_GRPC_FORWARD, DEF_GRPC_FORWARD_ENABLE); setIfMissing(TIKV_GRPC_HEALTH_CHECK_TIMEOUT, DEF_CHECK_HEALTH_TIMEOUT); + setIfMissing(TIKV_HEALTH_CHECK_PERIOD_DURATION, DEF_HEALTH_CHECK_PERIOD_DURATION); } public static void listAll() { @@ -264,6 +265,7 @@ private static ReplicaRead getReplicaRead(String key) { private boolean metricsEnable = getBoolean(TIKV_METRICS_ENABLE); private int metricsPort = getInt(TIKV_METRICS_PORT); private int grpcHealthCheckTimeout = getInt(TIKV_GRPC_HEALTH_CHECK_TIMEOUT); + private int healthCheckPeriodDuration = getInt(TIKV_HEALTH_CHECK_PERIOD_DURATION); private final String networkMappingName = get(TIKV_NETWORK_MAPPING_NAME); private HostMapping hostMapping = null; @@ -569,4 +571,8 @@ public boolean getEnableGrpcForward() { public long getGrpcHealthCheckTimeout() { return this.grpcHealthCheckTimeout; } + + public long getHealthCheckPeriodDuration() { + return this.healthCheckPeriodDuration; + } } diff --git a/src/main/java/org/tikv/common/TiSession.java b/src/main/java/org/tikv/common/TiSession.java index a7b6c543085..2bf1e8a94a7 100644 --- a/src/main/java/org/tikv/common/TiSession.java +++ b/src/main/java/org/tikv/common/TiSession.java @@ -26,12 +26,10 @@ import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.function.Function; import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.tikv.common.catalog.Catalog; -import org.tikv.common.event.CacheInvalidateEvent; import org.tikv.common.exception.TiKVException; import org.tikv.common.key.Key; import org.tikv.common.meta.TiTimestamp; @@ -56,7 +54,6 @@ public class TiSession implements AutoCloseable { private static final Map sessionCachedMap = new HashMap<>(); private final TiConfiguration conf; private final ChannelFactory channelFactory; - private Function cacheInvalidateCallback; // below object creation is either heavy or making connection (pd), pending for lazy loading private volatile PDClient client; private volatile Catalog catalog; @@ -182,13 +179,7 @@ public RegionManager getRegionManager() { if (res == null) { synchronized (this) { if (regionManager == null) { - regionManager = - new RegionManager( - getConf(), - getPDClient(), - this.cacheInvalidateCallback, - this.channelFactory, - this.enableGrpcForward); + regionManager = new RegionManager(getConf(), getPDClient(), this.channelFactory); } res = regionManager; } @@ -331,15 +322,6 @@ public ChannelFactory getChannelFactory() { return channelFactory; } - /** - * This is used for setting call back function to invalidate cache information - * - * @param callBackFunc callback function - */ - public void injectCallBackFunc(Function callBackFunc) { - this.cacheInvalidateCallback = callBackFunc; - } - /** * split region and scatter * diff --git a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java index 9edb94c0f46..c47bb854746 100644 --- a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java @@ -125,6 +125,14 @@ public boolean onNotLeader(TiRegion newRegion) { @Override public boolean onStoreUnreachable() { + if (!targetStore.isValid()) { + logger.warn( + String.format("store [%d] has been invalid", region.getId(), targetStore.getId())); + targetStore = regionManager.getStoreById(targetStore.getId()); + updateClientStub(); + return true; + } + if (targetStore.getProxyStore() == null) { if (targetStore.isReachable()) { return true; @@ -238,20 +246,22 @@ private boolean retryOtherStoreLeader() { private void updateClientStub() { String addressStr = targetStore.getStore().getAddress(); + if (targetStore.getProxyStore() != null) { + addressStr = targetStore.getProxyStore().getAddress(); + } ManagedChannel channel = channelFactory.getChannel(addressStr, regionManager.getPDClient().getHostMapping()); blockingStub = TikvGrpc.newBlockingStub(channel); asyncStub = TikvGrpc.newStub(channel); + if (targetStore.getProxyStore() != null) { + Metadata header = new Metadata(); + header.put(TiConfiguration.FORWARD_META_DATA_KEY, targetStore.getStore().getAddress()); + blockingStub = MetadataUtils.attachHeaders(blockingStub, header); + asyncStub = MetadataUtils.attachHeaders(asyncStub, header); + } } private boolean retryOtherStoreByProxyForward() { - if (!targetStore.isValid()) { - targetStore = regionManager.getStoreById(targetStore.getId()); - logger.warn( - String.format("store [%d] has been invalid", region.getId(), targetStore.getId())); - return true; - } - TiStore proxyStore = switchProxyStore(); if (proxyStore == null) { logger.warn( @@ -268,19 +278,13 @@ private boolean retryOtherStoreByProxyForward() { } targetStore = proxyStore; retryForwardTimes += 1; + updateClientStub(); logger.warn( String.format( "forward request to store [%s] by store [%s] for region[%d]", targetStore.getStore().getAddress(), targetStore.getProxyStore().getAddress(), region.getId())); - String addressStr = targetStore.getProxyStore().getAddress(); - ManagedChannel channel = - channelFactory.getChannel(addressStr, regionManager.getPDClient().getHostMapping()); - Metadata header = new Metadata(); - header.put(TiConfiguration.FORWARD_META_DATA_KEY, targetStore.getStore().getAddress()); - blockingStub = MetadataUtils.attachHeaders(TikvGrpc.newBlockingStub(channel), header); - asyncStub = MetadataUtils.attachHeaders(TikvGrpc.newStub(channel), header); return true; } diff --git a/src/main/java/org/tikv/common/region/RegionManager.java b/src/main/java/org/tikv/common/region/RegionManager.java index c7b62348169..af2bb1aae61 100644 --- a/src/main/java/org/tikv/common/region/RegionManager.java +++ b/src/main/java/org/tikv/common/region/RegionManager.java @@ -25,13 +25,11 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.function.Function; import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.tikv.common.ReadOnlyPDClient; import org.tikv.common.TiConfiguration; -import org.tikv.common.event.CacheInvalidateEvent; import org.tikv.common.exception.GrpcException; import org.tikv.common.exception.TiClientInternalException; import org.tikv.common.util.BackOffer; @@ -59,44 +57,24 @@ public class RegionManager { private final ScheduledExecutorService executor; private final StoreHealthyChecker storeChecker; - private final Function cacheInvalidateCallback; - - // To avoid double retrieval, we used the async version of grpc - // When rpc not returned, instead of call again, it wait for previous one done public RegionManager( - TiConfiguration conf, - ReadOnlyPDClient pdClient, - Function cacheInvalidateCallback) { + TiConfiguration conf, ReadOnlyPDClient pdClient, ChannelFactory channelFactory) { this.cache = new RegionCache(); this.pdClient = pdClient; this.conf = conf; - this.cacheInvalidateCallback = cacheInvalidateCallback; - this.executor = null; - this.storeChecker = null; - } - - public RegionManager( - TiConfiguration conf, - ReadOnlyPDClient pdClient, - Function cacheInvalidateCallback, - ChannelFactory channelFactory, - boolean enableGrpcForward) { - this.cache = new RegionCache(); - this.cacheInvalidateCallback = cacheInvalidateCallback; - this.pdClient = pdClient; - this.conf = conf; + long period = conf.getHealthCheckPeriodDuration(); StoreHealthyChecker storeChecker = - new StoreHealthyChecker(channelFactory, pdClient, this.cache); + new StoreHealthyChecker( + channelFactory, pdClient, this.cache, conf.getGrpcHealthCheckTimeout()); this.storeChecker = storeChecker; this.executor = Executors.newScheduledThreadPool(1); - this.executor.scheduleAtFixedRate(storeChecker, 1, 1, TimeUnit.SECONDS); + this.executor.scheduleAtFixedRate(storeChecker, period, period, TimeUnit.MILLISECONDS); } public RegionManager(TiConfiguration conf, ReadOnlyPDClient pdClient) { this.cache = new RegionCache(); this.pdClient = pdClient; this.conf = conf; - this.cacheInvalidateCallback = null; this.storeChecker = null; this.executor = null; } @@ -107,10 +85,6 @@ public synchronized void close() { } } - public Function getCacheInvalidateCallback() { - return cacheInvalidateCallback; - } - public ReadOnlyPDClient getPDClient() { return this.pdClient; } diff --git a/src/main/java/org/tikv/common/region/StoreHealthyChecker.java b/src/main/java/org/tikv/common/region/StoreHealthyChecker.java index 352725f5fed..fbc75cb534f 100644 --- a/src/main/java/org/tikv/common/region/StoreHealthyChecker.java +++ b/src/main/java/org/tikv/common/region/StoreHealthyChecker.java @@ -19,20 +19,21 @@ public class StoreHealthyChecker implements Runnable { private static final Logger logger = LoggerFactory.getLogger(StoreHealthyChecker.class); private static final long MAX_CHECK_STORE_TOMBSTONE_TICK = 60; - private static final long SLEEP_MILLI_SECONDS_AFTER_DOUBLE_CHECK = 500; private BlockingQueue taskQueue; private final ChannelFactory channelFactory; private final ReadOnlyPDClient pdClient; private final RegionCache cache; private long checkTombstoneTick; + private long timeout; public StoreHealthyChecker( - ChannelFactory channelFactory, ReadOnlyPDClient pdClient, RegionCache cache) { + ChannelFactory channelFactory, ReadOnlyPDClient pdClient, RegionCache cache, long timeout) { this.taskQueue = new LinkedBlockingQueue<>(); this.channelFactory = channelFactory; this.pdClient = pdClient; this.cache = cache; this.checkTombstoneTick = 0; + this.timeout = timeout; } public boolean scheduleStoreHealthCheck(TiStore store) { @@ -64,7 +65,7 @@ private boolean checkStoreHealth(TiStore store) { try { ManagedChannel channel = channelFactory.getChannel(addressStr, pdClient.getHostMapping()); HealthGrpc.HealthBlockingStub stub = - HealthGrpc.newBlockingStub(channel).withDeadlineAfter(200, TimeUnit.MILLISECONDS); + HealthGrpc.newBlockingStub(channel).withDeadlineAfter(timeout, TimeUnit.MILLISECONDS); HealthCheckRequest req = HealthCheckRequest.newBuilder().build(); HealthCheckResponse resp = stub.check(req); if (resp.getStatus() == HealthCheckResponse.ServingStatus.SERVING) { @@ -133,7 +134,7 @@ public void run() { } if (!unreachableStore.isEmpty()) { try { - Thread.sleep(SLEEP_MILLI_SECONDS_AFTER_DOUBLE_CHECK); + Thread.sleep(timeout); } catch (Exception e) { this.taskQueue.addAll(unreachableStore); return;