diff --git a/src/main/java/org/tikv/common/ConfigUtils.java b/src/main/java/org/tikv/common/ConfigUtils.java index 48254dbc7a0..827545b7d16 100644 --- a/src/main/java/org/tikv/common/ConfigUtils.java +++ b/src/main/java/org/tikv/common/ConfigUtils.java @@ -25,6 +25,7 @@ public class ConfigUtils { public static final String TIKV_GRPC_TIMEOUT = "tikv.grpc.timeout_in_ms"; public static final String TIKV_GRPC_INGEST_TIMEOUT = "tikv.grpc.ingest_timeout_in_ms"; public static final String TIKV_GRPC_FORWARD_TIMEOUT = "tikv.grpc.forward_timeout_in_ms"; + public static final String TIKV_GRPC_WARM_UP_TIMEOUT = "tikv.grpc.warm_up_timeout_in_ms"; public static final String TIKV_PD_FIRST_GET_MEMBER_TIMEOUT = "tikv.grpc.pd_first_get_member_timeout_in_ms"; public static final String TIKV_GRPC_SCAN_TIMEOUT = "tikv.grpc.scan_timeout_in_ms"; @@ -111,11 +112,15 @@ public class ConfigUtils { public static final String TiKV_CIRCUIT_BREAK_ATTEMPT_REQUEST_COUNT = "tikv.circuit_break.trigger.attempt_request_count"; + public static final String TIKV_SCAN_REGIONS_LIMIT = "tikv.scan_regions_limit"; + public static final String TIFLASH_ENABLE = "tiflash.enable"; + public static final String TIKV_WARM_UP_ENABLE = "tikv.warm_up.enable"; public static final String DEF_PD_ADDRESSES = "127.0.0.1:2379"; public static final String DEF_TIMEOUT = "200ms"; public static final String DEF_TIKV_GRPC_INGEST_TIMEOUT = "200s"; public static final String DEF_FORWARD_TIMEOUT = "300ms"; + public static final String DEF_TIKV_GRPC_WARM_UP_TIMEOUT = "5000ms"; public static final String DEF_TIKV_PD_FIRST_GET_MEMBER_TIMEOUT = "10000ms"; public static final String DEF_SCAN_TIMEOUT = "20s"; public static final int DEF_CHECK_HEALTH_TIMEOUT = 100; @@ -182,6 +187,7 @@ public class ConfigUtils { public static final boolean DEF_TIKV_TLS_ENABLE = false; public static final boolean DEF_TIKV_USE_JKS = false; public static final boolean DEF_TIFLASH_ENABLE = false; + public static final boolean DEF_TIKV_WARM_UP_ENABLE = true; public static final boolean DEF_TiKV_CIRCUIT_BREAK_ENABLE = false; public static final int DEF_TiKV_CIRCUIT_BREAK_AVAILABILITY_WINDOW_IN_SECONDS = 60; @@ -189,4 +195,6 @@ public class ConfigUtils { public static final int DEF_TiKV_CIRCUIT_BREAK_AVAILABILITY_REQUST_VOLUMN_THRESHOLD = 10; public static final int DEF_TiKV_CIRCUIT_BREAK_SLEEP_WINDOW_IN_SECONDS = 20; public static final int DEF_TiKV_CIRCUIT_BREAK_ATTEMPT_REQUEST_COUNT = 10; + + public static final int DEF_TIKV_SCAN_REGIONS_LIMIT = 1000; } diff --git a/src/main/java/org/tikv/common/PDClient.java b/src/main/java/org/tikv/common/PDClient.java index a4c3837ca6f..b609c85804f 100644 --- a/src/main/java/org/tikv/common/PDClient.java +++ b/src/main/java/org/tikv/common/PDClient.java @@ -341,6 +341,27 @@ public Pair getRegionByID(BackOffer backOffer, long return new Pair(decodeRegion(resp.getRegion()), resp.getLeader()); } + @Override + public List scanRegions( + BackOffer backOffer, ByteString startKey, ByteString endKey, int limit) { + // no need to backoff because ScanRegions is just for optimization + // introduce a warm-up timeout for ScanRegions requests + PDGrpc.PDBlockingStub stub = + getBlockingStub().withDeadlineAfter(conf.getWarmUpTimeout(), TimeUnit.MILLISECONDS); + Pdpb.ScanRegionsRequest request = + Pdpb.ScanRegionsRequest.newBuilder() + .setHeader(header) + .setStartKey(startKey) + .setEndKey(endKey) + .setLimit(limit) + .build(); + Pdpb.ScanRegionsResponse resp = stub.scanRegions(request); + if (resp == null) { + return null; + } + return resp.getRegionsList(); + } + private Supplier buildGetStoreReq(long storeId) { return () -> GetStoreRequest.newBuilder().setHeader(header).setStoreId(storeId).build(); } diff --git a/src/main/java/org/tikv/common/ReadOnlyPDClient.java b/src/main/java/org/tikv/common/ReadOnlyPDClient.java index 63518b9377b..802d258bc8f 100644 --- a/src/main/java/org/tikv/common/ReadOnlyPDClient.java +++ b/src/main/java/org/tikv/common/ReadOnlyPDClient.java @@ -22,6 +22,7 @@ import org.tikv.common.util.Pair; import org.tikv.kvproto.Metapb; import org.tikv.kvproto.Metapb.Store; +import org.tikv.kvproto.Pdpb; /** Readonly PD client including only reading related interface Supposed for TiDB-like use cases */ public interface ReadOnlyPDClient { @@ -48,6 +49,9 @@ public interface ReadOnlyPDClient { */ Pair getRegionByID(BackOffer backOffer, long id); + List scanRegions( + BackOffer backOffer, ByteString startKey, ByteString endKey, int limit); + HostMapping getHostMapping(); /** diff --git a/src/main/java/org/tikv/common/TiConfiguration.java b/src/main/java/org/tikv/common/TiConfiguration.java index 2c89919e17f..de417408309 100644 --- a/src/main/java/org/tikv/common/TiConfiguration.java +++ b/src/main/java/org/tikv/common/TiConfiguration.java @@ -86,6 +86,7 @@ private static void loadFromDefaultProperties() { setIfMissing(TIKV_GRPC_TIMEOUT, DEF_TIMEOUT); setIfMissing(TIKV_GRPC_INGEST_TIMEOUT, DEF_TIKV_GRPC_INGEST_TIMEOUT); setIfMissing(TIKV_GRPC_FORWARD_TIMEOUT, DEF_FORWARD_TIMEOUT); + setIfMissing(TIKV_GRPC_WARM_UP_TIMEOUT, DEF_TIKV_GRPC_WARM_UP_TIMEOUT); setIfMissing(TIKV_PD_FIRST_GET_MEMBER_TIMEOUT, DEF_TIKV_PD_FIRST_GET_MEMBER_TIMEOUT); setIfMissing(TIKV_GRPC_SCAN_TIMEOUT, DEF_SCAN_TIMEOUT); setIfMissing(TIKV_GRPC_SCAN_BATCH_SIZE, DEF_SCAN_BATCH_SIZE); @@ -124,6 +125,7 @@ private static void loadFromDefaultProperties() { setIfMissing(TIKV_TLS_ENABLE, DEF_TIKV_TLS_ENABLE); setIfMissing(TIKV_USE_JKS, DEF_TIKV_USE_JKS); setIfMissing(TIFLASH_ENABLE, DEF_TIFLASH_ENABLE); + setIfMissing(TIKV_WARM_UP_ENABLE, DEF_TIKV_WARM_UP_ENABLE); setIfMissing(TIKV_RAWKV_READ_TIMEOUT_IN_MS, DEF_TIKV_RAWKV_READ_TIMEOUT_IN_MS); setIfMissing(TIKV_RAWKV_WRITE_TIMEOUT_IN_MS, DEF_TIKV_RAWKV_WRITE_TIMEOUT_IN_MS); setIfMissing(TIKV_RAWKV_BATCH_READ_TIMEOUT_IN_MS, DEF_TIKV_RAWKV_BATCH_READ_TIMEOUT_IN_MS); @@ -146,6 +148,7 @@ private static void loadFromDefaultProperties() { TiKV_CIRCUIT_BREAK_SLEEP_WINDOW_IN_SECONDS, DEF_TiKV_CIRCUIT_BREAK_SLEEP_WINDOW_IN_SECONDS); setIfMissing( TiKV_CIRCUIT_BREAK_ATTEMPT_REQUEST_COUNT, DEF_TiKV_CIRCUIT_BREAK_ATTEMPT_REQUEST_COUNT); + setIfMissing(TIKV_SCAN_REGIONS_LIMIT, DEF_TIKV_SCAN_REGIONS_LIMIT); } public static void listAll() { @@ -309,6 +312,7 @@ private static ReplicaRead getReplicaRead(String key) { private long timeout = getTimeAsMs(TIKV_GRPC_TIMEOUT); private long ingestTimeout = getTimeAsMs(TIKV_GRPC_INGEST_TIMEOUT); private long forwardTimeout = getTimeAsMs(TIKV_GRPC_FORWARD_TIMEOUT); + private long warmUpTimeout = getTimeAsMs(TIKV_GRPC_WARM_UP_TIMEOUT); private long pdFirstGetMemberTimeout = getTimeAsMs(TIKV_PD_FIRST_GET_MEMBER_TIMEOUT); private long scanTimeout = getTimeAsMs(TIKV_GRPC_SCAN_TIMEOUT); private int maxFrameSize = getInt(TIKV_GRPC_MAX_FRAME_SIZE); @@ -376,6 +380,7 @@ private static ReplicaRead getReplicaRead(String key) { private String jksTrustPassword = getOption(TIKV_JKS_TRUST_PASSWORD).orElse(null); private boolean tiFlashEnable = getBoolean(TIFLASH_ENABLE); + private boolean warmUpEnable = getBoolean(TIKV_WARM_UP_ENABLE); private boolean isTest = false; @@ -393,6 +398,8 @@ private static ReplicaRead getReplicaRead(String key) { private int circuitBreakSleepWindowInSeconds = getInt(TiKV_CIRCUIT_BREAK_SLEEP_WINDOW_IN_SECONDS); private int circuitBreakAttemptRequestCount = getInt(TiKV_CIRCUIT_BREAK_ATTEMPT_REQUEST_COUNT); + private int scanRegionsLimit = getInt(TIKV_SCAN_REGIONS_LIMIT); + public enum KVMode { TXN, RAW @@ -475,6 +482,15 @@ public TiConfiguration setForwardTimeout(long timeout) { return this; } + public long getWarmUpTimeout() { + return warmUpTimeout; + } + + public TiConfiguration setWarmUpTimeout(long timeout) { + this.warmUpTimeout = timeout; + return this; + } + public long getPdFirstGetMemberTimeout() { return pdFirstGetMemberTimeout; } @@ -811,6 +827,14 @@ public boolean isTiFlashEnabled() { return tiFlashEnable; } + public boolean isWarmUpEnable() { + return warmUpEnable; + } + + public void setWarmUpEnable(boolean warmUpEnable) { + this.warmUpEnable = warmUpEnable; + } + public boolean isTlsEnable() { return tlsEnable; } @@ -1023,4 +1047,12 @@ public int getCircuitBreakAttemptRequestCount() { public void setCircuitBreakAttemptRequestCount(int circuitBreakAttemptRequestCount) { this.circuitBreakAttemptRequestCount = circuitBreakAttemptRequestCount; } + + public int getScanRegionsLimit() { + return scanRegionsLimit; + } + + public void setScanRegionsLimit(int scanRegionsLimit) { + this.scanRegionsLimit = scanRegionsLimit; + } } diff --git a/src/main/java/org/tikv/common/TiSession.java b/src/main/java/org/tikv/common/TiSession.java index db9e5602d05..ac84daec017 100644 --- a/src/main/java/org/tikv/common/TiSession.java +++ b/src/main/java/org/tikv/common/TiSession.java @@ -36,6 +36,7 @@ import org.tikv.common.util.*; import org.tikv.kvproto.ImportSstpb; import org.tikv.kvproto.Metapb; +import org.tikv.kvproto.Pdpb; import org.tikv.raw.RawKVClient; import org.tikv.raw.SmartRawKVClient; import org.tikv.service.failsafe.CircuitBreaker; @@ -137,7 +138,9 @@ public TiSession(TiConfiguration conf) { if (this.enableGrpcForward) { logger.info("enable grpc forward for high available"); } - warmUp(); + if (conf.isWarmUpEnable() && conf.isRawKVMode()) { + warmUp(); + } this.circuitBreaker = new CircuitBreakerImpl(conf); logger.info("TiSession initialized in " + conf.getKvMode() + " mode"); } @@ -169,24 +172,39 @@ private synchronized void warmUp() { null, new TiStore(this.client.getStore(ConcreteBackOffer.newGetBackOff(), store.getId()))); } - ByteString startKey = ByteString.EMPTY; + // use scan region to load region cache with limit + ByteString startKey = ByteString.EMPTY; do { - TiRegion region = regionManager.getRegionByKey(startKey); - startKey = region.getEndKey(); + List regions = + regionManager.scanRegions( + ConcreteBackOffer.newGetBackOff(), + startKey, + ByteString.EMPTY, + conf.getScanRegionsLimit()); + if (regions == null || regions.isEmpty()) { + // something went wrong, but the warm-up process could continue + break; + } + for (Pdpb.Region region : regions) { + regionManager.insertRegionToCache( + regionManager.createRegion(region.getRegion(), ConcreteBackOffer.newGetBackOff())); + } + startKey = regions.get(regions.size() - 1).getRegion().getEndKey(); } while (!startKey.isEmpty()); - RawKVClient rawKVClient = createRawClient(); - ByteString exampleKey = ByteString.EMPTY; - Optional prev = rawKVClient.get(exampleKey); - if (prev.isPresent()) { - rawKVClient.delete(exampleKey); - rawKVClient.putIfAbsent(exampleKey, prev.get()); - rawKVClient.put(exampleKey, prev.get()); - } else { - rawKVClient.putIfAbsent(exampleKey, ByteString.EMPTY); - rawKVClient.put(exampleKey, ByteString.EMPTY); - rawKVClient.delete(exampleKey); + try (RawKVClient rawKVClient = createRawClient()) { + ByteString exampleKey = ByteString.EMPTY; + Optional prev = rawKVClient.get(exampleKey); + if (prev.isPresent()) { + rawKVClient.delete(exampleKey); + rawKVClient.putIfAbsent(exampleKey, prev.get()); + rawKVClient.put(exampleKey, prev.get()); + } else { + rawKVClient.putIfAbsent(exampleKey, ByteString.EMPTY); + rawKVClient.put(exampleKey, ByteString.EMPTY); + rawKVClient.delete(exampleKey); + } } } catch (Exception e) { // ignore error diff --git a/src/main/java/org/tikv/common/region/RegionManager.java b/src/main/java/org/tikv/common/region/RegionManager.java index b9fc08d73fd..0d22f430550 100644 --- a/src/main/java/org/tikv/common/region/RegionManager.java +++ b/src/main/java/org/tikv/common/region/RegionManager.java @@ -21,6 +21,7 @@ import com.google.protobuf.ByteString; import io.prometheus.client.Histogram; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -41,6 +42,7 @@ import org.tikv.kvproto.Metapb; import org.tikv.kvproto.Metapb.Peer; import org.tikv.kvproto.Metapb.StoreState; +import org.tikv.kvproto.Pdpb; @SuppressWarnings("UnstableApiUsage") public class RegionManager { @@ -50,6 +52,11 @@ public class RegionManager { .name("client_java_get_region_by_requests_latency") .help("getRegionByKey request latency.") .register(); + public static final Histogram SCAN_REGIONS_REQUEST_LATENCY = + Histogram.build() + .name("client_java_scan_regions_request_latency") + .help("scanRegions request latency.") + .register(); // TODO: the region cache logic need rewrite. // https://github.com/pingcap/tispark/issues/1170 @@ -95,6 +102,20 @@ public void invalidateAll() { cache.invalidateAll(); } + public List scanRegions( + BackOffer backOffer, ByteString startKey, ByteString endKey, int limit) { + Histogram.Timer requestTimer = SCAN_REGIONS_REQUEST_LATENCY.startTimer(); + SlowLogSpan slowLogSpan = backOffer.getSlowLog().start("scanRegions"); + try { + return pdClient.scanRegions(backOffer, startKey, endKey, limit); + } catch (Exception e) { + return new ArrayList<>(); + } finally { + requestTimer.observeDuration(); + slowLogSpan.end(); + } + } + public TiRegion getRegionByKey(ByteString key) { return getRegionByKey(key, defaultBackOff()); } diff --git a/src/test/java/org/tikv/BaseRawKVTest.java b/src/test/java/org/tikv/BaseRawKVTest.java index b24b981ad72..7b6fa1aefa0 100644 --- a/src/test/java/org/tikv/BaseRawKVTest.java +++ b/src/test/java/org/tikv/BaseRawKVTest.java @@ -12,6 +12,7 @@ protected TiConfiguration createTiConfiguration() { ? TiConfiguration.createRawDefault() : TiConfiguration.createRawDefault(pdAddrsStr); conf.setTest(true); + conf.setEnableAtomicForCAS(true); conf.setEnableGrpcForward(false); return conf; } diff --git a/src/test/java/org/tikv/common/TiSessionTest.java b/src/test/java/org/tikv/common/TiSessionTest.java index 1cbc2ced5bd..8599e9510d4 100644 --- a/src/test/java/org/tikv/common/TiSessionTest.java +++ b/src/test/java/org/tikv/common/TiSessionTest.java @@ -14,11 +14,14 @@ import org.junit.After; import org.junit.Ignore; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.tikv.BaseRawKVTest; import org.tikv.common.region.TiRegion; import org.tikv.raw.RawKVClient; public class TiSessionTest extends BaseRawKVTest { + private static final Logger logger = LoggerFactory.getLogger(TiSessionTest.class); private TiSession session; @After @@ -122,4 +125,31 @@ private void doCloseTest(boolean now, long timeoutMS) throws Exception { assertTrue(e.getMessage().contains("rejected from java.util.concurrent.ThreadPoolExecutor")); } } + + @Test + public void warmUpTest() throws Exception { + TiConfiguration conf = createTiConfiguration(); + conf.setWarmUpEnable(true); + long t0 = doTest(conf); + conf.setWarmUpEnable(false); + long t1 = doTest(conf); + assertTrue(t0 < t1); + } + + private long doTest(TiConfiguration conf) throws Exception { + session = TiSession.create(conf); + long start = System.currentTimeMillis(); + try (RawKVClient client = session.createRawClient()) { + client.get(ByteString.EMPTY); + } + long end = System.currentTimeMillis(); + logger.info( + "[warm up " + + (conf.isWarmUpEnable() ? "enabled" : "disabled") + + "] duration " + + (end - start) + + "ms"); + session.close(); + return end - start; + } }