From b68f4a48737eba78136fae827a98e93eaddfacbb Mon Sep 17 00:00:00 2001 From: birdstorm Date: Fri, 24 Dec 2021 11:48:25 +0800 Subject: [PATCH 1/3] [to #375] use scanRegions request to warm up client (#383) Signed-off-by: birdstorm --- .../java/org/tikv/common/ConfigUtils.java | 8 ++++ src/main/java/org/tikv/common/PDClient.java | 21 +++++++++ .../org/tikv/common/ReadOnlyPDClient.java | 4 ++ .../java/org/tikv/common/TiConfiguration.java | 32 ++++++++++++++ src/main/java/org/tikv/common/TiSession.java | 44 ++++++++++++++----- .../org/tikv/common/region/RegionManager.java | 21 +++++++++ .../java/org/tikv/common/TiSessionTest.java | 30 +++++++++++++ 7 files changed, 150 insertions(+), 10 deletions(-) diff --git a/src/main/java/org/tikv/common/ConfigUtils.java b/src/main/java/org/tikv/common/ConfigUtils.java index ff2d8664551..d2320141fbc 100644 --- a/src/main/java/org/tikv/common/ConfigUtils.java +++ b/src/main/java/org/tikv/common/ConfigUtils.java @@ -24,6 +24,7 @@ public class ConfigUtils { public static final String TIKV_PD_ADDRESSES = "tikv.pd.addresses"; public static final String TIKV_GRPC_TIMEOUT = "tikv.grpc.timeout_in_ms"; public static final String TIKV_GRPC_FORWARD_TIMEOUT = "tikv.grpc.forward_timeout_in_ms"; + public static final String TIKV_GRPC_WARM_UP_TIMEOUT = "tikv.grpc.warm_up_timeout_in_ms"; public static final String TIKV_GRPC_SCAN_TIMEOUT = "tikv.grpc.scan_timeout_in_ms"; public static final String TIKV_GRPC_SCAN_BATCH_SIZE = "tikv.grpc.scan_batch_size"; public static final String TIKV_GRPC_MAX_FRAME_SIZE = "tikv.grpc.max_frame_size"; @@ -88,10 +89,14 @@ public class ConfigUtils { public static final String TiKV_CIRCUIT_BREAK_ATTEMPT_REQUEST_COUNT = "tikv.circuit_break.trigger.attempt_request_count"; + public static final String TIKV_SCAN_REGIONS_LIMIT = "tikv.scan_regions_limit"; + public static final String TIFLASH_ENABLE = "tiflash.enable"; + public static final String TIKV_WARM_UP_ENABLE = "tikv.warm_up.enable"; public static final String DEF_PD_ADDRESSES = "127.0.0.1:2379"; public static final String DEF_TIMEOUT = "200ms"; public static final String DEF_FORWARD_TIMEOUT = "300ms"; + public static final String DEF_TIKV_GRPC_WARM_UP_TIMEOUT = "5000ms"; public static final String DEF_SCAN_TIMEOUT = "20s"; public static final int DEF_CHECK_HEALTH_TIMEOUT = 100; public static final int DEF_HEALTH_CHECK_PERIOD_DURATION = 300; @@ -148,6 +153,7 @@ public class ConfigUtils { public static final String LEADER_AND_FOLLOWER = "LEADER_AND_FOLLOWER"; public static final int DEF_TIKV_GRPC_IDLE_TIMEOUT = 60; + public static final boolean DEF_TIKV_WARM_UP_ENABLE = true; public static final boolean DEF_TiKV_CIRCUIT_BREAK_ENABLE = false; public static final int DEF_TiKV_CIRCUIT_BREAK_AVAILABILITY_WINDOW_IN_SECONDS = 60; @@ -155,4 +161,6 @@ public class ConfigUtils { public static final int DEF_TiKV_CIRCUIT_BREAK_AVAILABILITY_REQUST_VOLUMN_THRESHOLD = 10; public static final int DEF_TiKV_CIRCUIT_BREAK_SLEEP_WINDOW_IN_SECONDS = 20; public static final int DEF_TiKV_CIRCUIT_BREAK_ATTEMPT_REQUEST_COUNT = 10; + + public static final int DEF_TIKV_SCAN_REGIONS_LIMIT = 1000; } diff --git a/src/main/java/org/tikv/common/PDClient.java b/src/main/java/org/tikv/common/PDClient.java index 3d2e7f4189c..7760a2af89b 100644 --- a/src/main/java/org/tikv/common/PDClient.java +++ b/src/main/java/org/tikv/common/PDClient.java @@ -265,6 +265,27 @@ public Pair getRegionByID(BackOffer backOffer, long return new Pair(decodeRegion(resp.getRegion()), resp.getLeader()); } + @Override + public List scanRegions( + BackOffer backOffer, ByteString startKey, ByteString endKey, int limit) { + // no need to backoff because ScanRegions is just for optimization + // introduce a warm-up timeout for ScanRegions requests + PDGrpc.PDBlockingStub stub = + getBlockingStub().withDeadlineAfter(conf.getWarmUpTimeout(), TimeUnit.MILLISECONDS); + Pdpb.ScanRegionsRequest request = + Pdpb.ScanRegionsRequest.newBuilder() + .setHeader(header) + .setStartKey(startKey) + .setEndKey(endKey) + .setLimit(limit) + .build(); + Pdpb.ScanRegionsResponse resp = stub.scanRegions(request); + if (resp == null) { + return null; + } + return resp.getRegionsList(); + } + private Supplier buildGetStoreReq(long storeId) { return () -> GetStoreRequest.newBuilder().setHeader(header).setStoreId(storeId).build(); } diff --git a/src/main/java/org/tikv/common/ReadOnlyPDClient.java b/src/main/java/org/tikv/common/ReadOnlyPDClient.java index 63518b9377b..802d258bc8f 100644 --- a/src/main/java/org/tikv/common/ReadOnlyPDClient.java +++ b/src/main/java/org/tikv/common/ReadOnlyPDClient.java @@ -22,6 +22,7 @@ import org.tikv.common.util.Pair; import org.tikv.kvproto.Metapb; import org.tikv.kvproto.Metapb.Store; +import org.tikv.kvproto.Pdpb; /** Readonly PD client including only reading related interface Supposed for TiDB-like use cases */ public interface ReadOnlyPDClient { @@ -48,6 +49,9 @@ public interface ReadOnlyPDClient { */ Pair getRegionByID(BackOffer backOffer, long id); + List scanRegions( + BackOffer backOffer, ByteString startKey, ByteString endKey, int limit); + HostMapping getHostMapping(); /** diff --git a/src/main/java/org/tikv/common/TiConfiguration.java b/src/main/java/org/tikv/common/TiConfiguration.java index b76f78a0e9b..0c4c3b3416e 100644 --- a/src/main/java/org/tikv/common/TiConfiguration.java +++ b/src/main/java/org/tikv/common/TiConfiguration.java @@ -85,6 +85,7 @@ private static void loadFromDefaultProperties() { setIfMissing(TIKV_PD_ADDRESSES, DEF_PD_ADDRESSES); setIfMissing(TIKV_GRPC_TIMEOUT, DEF_TIMEOUT); setIfMissing(TIKV_GRPC_FORWARD_TIMEOUT, DEF_FORWARD_TIMEOUT); + setIfMissing(TIKV_GRPC_WARM_UP_TIMEOUT, DEF_TIKV_GRPC_WARM_UP_TIMEOUT); setIfMissing(TIKV_GRPC_SCAN_TIMEOUT, DEF_SCAN_TIMEOUT); setIfMissing(TIKV_GRPC_SCAN_BATCH_SIZE, DEF_SCAN_BATCH_SIZE); setIfMissing(TIKV_GRPC_MAX_FRAME_SIZE, DEF_MAX_FRAME_SIZE); @@ -113,6 +114,7 @@ private static void loadFromDefaultProperties() { setIfMissing(TIKV_HEALTH_CHECK_PERIOD_DURATION, DEF_HEALTH_CHECK_PERIOD_DURATION); setIfMissing(TIKV_RAWKV_DEFAULT_BACKOFF_IN_MS, DEF_TIKV_RAWKV_DEFAULT_BACKOFF_IN_MS); setIfMissing(TIKV_GRPC_IDLE_TIMEOUT, DEF_TIKV_GRPC_IDLE_TIMEOUT); + setIfMissing(TIKV_WARM_UP_ENABLE, DEF_TIKV_WARM_UP_ENABLE); setIfMissing(TIKV_RAWKV_READ_TIMEOUT_IN_MS, DEF_TIKV_RAWKV_READ_TIMEOUT_IN_MS); setIfMissing(TIKV_RAWKV_WRITE_TIMEOUT_IN_MS, DEF_TIKV_RAWKV_WRITE_TIMEOUT_IN_MS); setIfMissing(TIKV_RAWKV_BATCH_READ_TIMEOUT_IN_MS, DEF_TIKV_RAWKV_BATCH_READ_TIMEOUT_IN_MS); @@ -135,6 +137,7 @@ private static void loadFromDefaultProperties() { TiKV_CIRCUIT_BREAK_SLEEP_WINDOW_IN_SECONDS, DEF_TiKV_CIRCUIT_BREAK_SLEEP_WINDOW_IN_SECONDS); setIfMissing( TiKV_CIRCUIT_BREAK_ATTEMPT_REQUEST_COUNT, DEF_TiKV_CIRCUIT_BREAK_ATTEMPT_REQUEST_COUNT); + setIfMissing(TIKV_SCAN_REGIONS_LIMIT, DEF_TIKV_SCAN_REGIONS_LIMIT); } public static void listAll() { @@ -297,6 +300,7 @@ private static ReplicaRead getReplicaRead(String key) { private long timeout = getTimeAsMs(TIKV_GRPC_TIMEOUT); private long forwardTimeout = getTimeAsMs(TIKV_GRPC_FORWARD_TIMEOUT); + private long warmUpTimeout = getTimeAsMs(TIKV_GRPC_WARM_UP_TIMEOUT); private long scanTimeout = getTimeAsMs(TIKV_GRPC_SCAN_TIMEOUT); private int maxFrameSize = getInt(TIKV_GRPC_MAX_FRAME_SIZE); private List pdAddrs = getPdAddrs(TIKV_PD_ADDRESSES); @@ -342,6 +346,7 @@ private static ReplicaRead getReplicaRead(String key) { private Optional rawKVBatchWriteSlowLogInMS = getIntOption(TIKV_RAWKV_BATCH_WRITE_SLOWLOG_IN_MS); private int rawKVScanSlowLogInMS = getInt(TIKV_RAWKV_SCAN_SLOWLOG_IN_MS); + private boolean warmUpEnable = getBoolean(TIKV_WARM_UP_ENABLE); private int idleTimeout = getInt(TIKV_GRPC_IDLE_TIMEOUT); private boolean circuitBreakEnable = getBoolean(TiKV_CIRCUIT_BREAK_ENABLE); private int circuitBreakAvailabilityWindowInSeconds = @@ -353,6 +358,8 @@ private static ReplicaRead getReplicaRead(String key) { private int circuitBreakSleepWindowInSeconds = getInt(TiKV_CIRCUIT_BREAK_SLEEP_WINDOW_IN_SECONDS); private int circuitBreakAttemptRequestCount = getInt(TiKV_CIRCUIT_BREAK_ATTEMPT_REQUEST_COUNT); + private int scanRegionsLimit = getInt(TIKV_SCAN_REGIONS_LIMIT); + public enum KVMode { TXN, RAW @@ -427,6 +434,15 @@ public TiConfiguration setForwardTimeout(long timeout) { return this; } + public long getWarmUpTimeout() { + return warmUpTimeout; + } + + public TiConfiguration setWarmUpTimeout(long timeout) { + this.warmUpTimeout = timeout; + return this; + } + public long getScanTimeout() { return scanTimeout; } @@ -679,6 +695,14 @@ public void setIdleTimeout(int timeout) { this.idleTimeout = timeout; } + public boolean isWarmUpEnable() { + return warmUpEnable; + } + + public void setWarmUpEnable(boolean warmUpEnable) { + this.warmUpEnable = warmUpEnable; + } + public int getRawKVReadTimeoutInMS() { return rawKVReadTimeoutInMS; } @@ -819,4 +843,12 @@ public int getCircuitBreakAttemptRequestCount() { public void setCircuitBreakAttemptRequestCount(int circuitBreakAttemptRequestCount) { this.circuitBreakAttemptRequestCount = circuitBreakAttemptRequestCount; } + + public int getScanRegionsLimit() { + return scanRegionsLimit; + } + + public void setScanRegionsLimit(int scanRegionsLimit) { + this.scanRegionsLimit = scanRegionsLimit; + } } diff --git a/src/main/java/org/tikv/common/TiSession.java b/src/main/java/org/tikv/common/TiSession.java index 3fab40ccc56..c081d915fa1 100644 --- a/src/main/java/org/tikv/common/TiSession.java +++ b/src/main/java/org/tikv/common/TiSession.java @@ -40,6 +40,7 @@ import org.tikv.common.region.TiStore; import org.tikv.common.util.*; import org.tikv.kvproto.Metapb; +import org.tikv.kvproto.Pdpb; import org.tikv.raw.RawKVClient; import org.tikv.raw.SmartRawKVClient; import org.tikv.service.failsafe.CircuitBreaker; @@ -86,7 +87,9 @@ public TiSession(TiConfiguration conf) { if (this.enableGrpcForward) { logger.info("enable grpc forward for high available"); } - warmUp(); + if (conf.isWarmUpEnable() && conf.isRawKVMode()) { + warmUp(); + } this.circuitBreaker = new CircuitBreakerImpl(conf); logger.info("TiSession initialized in " + conf.getKvMode() + " mode"); } @@ -104,19 +107,40 @@ private synchronized void warmUp() { this.regionManager.updateStore( null, new TiStore(this.client.getStore(backOffer, store.getId()))); } - ByteString startKey = ByteString.EMPTY; + // use scan region to load region cache with limit + ByteString startKey = ByteString.EMPTY; do { - TiRegion region = regionManager.getRegionByKey(startKey, backOffer); - startKey = region.getEndKey(); + List regions = + regionManager.scanRegions( + ConcreteBackOffer.newGetBackOff(), + startKey, + ByteString.EMPTY, + conf.getScanRegionsLimit()); + if (regions == null || regions.isEmpty()) { + // something went wrong, but the warm-up process could continue + break; + } + for (Pdpb.Region region : regions) { + regionManager.insertRegionToCache( + regionManager.createRegion(region.getRegion(), ConcreteBackOffer.newGetBackOff())); + } + startKey = regions.get(regions.size() - 1).getRegion().getEndKey(); } while (!startKey.isEmpty()); - RawKVClient rawKVClient = createRawClient(); - ByteString exampleKey = ByteString.EMPTY; - ByteString prev = rawKVClient.get(exampleKey); - rawKVClient.delete(exampleKey); - rawKVClient.putIfAbsent(exampleKey, prev); - rawKVClient.put(exampleKey, prev); + try (RawKVClient rawKVClient = createRawClient()) { + ByteString exampleKey = ByteString.EMPTY; + ByteString prev = rawKVClient.get(exampleKey); + if (prev != null) { + rawKVClient.delete(exampleKey); + rawKVClient.putIfAbsent(exampleKey, prev); + rawKVClient.put(exampleKey, prev); + } else { + rawKVClient.putIfAbsent(exampleKey, ByteString.EMPTY); + rawKVClient.put(exampleKey, ByteString.EMPTY); + rawKVClient.delete(exampleKey); + } + } } catch (Exception e) { // ignore error logger.info("warm up fails, ignored ", e); diff --git a/src/main/java/org/tikv/common/region/RegionManager.java b/src/main/java/org/tikv/common/region/RegionManager.java index 24b9acabe70..d18246ff0ef 100644 --- a/src/main/java/org/tikv/common/region/RegionManager.java +++ b/src/main/java/org/tikv/common/region/RegionManager.java @@ -21,6 +21,7 @@ import com.google.protobuf.ByteString; import io.prometheus.client.Histogram; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -41,6 +42,7 @@ import org.tikv.kvproto.Metapb; import org.tikv.kvproto.Metapb.Peer; import org.tikv.kvproto.Metapb.StoreState; +import org.tikv.kvproto.Pdpb; @SuppressWarnings("UnstableApiUsage") public class RegionManager { @@ -50,6 +52,11 @@ public class RegionManager { .name("client_java_get_region_by_requests_latency") .help("getRegionByKey request latency.") .register(); + public static final Histogram SCAN_REGIONS_REQUEST_LATENCY = + Histogram.build() + .name("client_java_scan_regions_request_latency") + .help("scanRegions request latency.") + .register(); // TODO: the region cache logic need rewrite. // https://github.com/pingcap/tispark/issues/1170 @@ -91,6 +98,20 @@ public ReadOnlyPDClient getPDClient() { return this.pdClient; } + public List scanRegions( + BackOffer backOffer, ByteString startKey, ByteString endKey, int limit) { + Histogram.Timer requestTimer = SCAN_REGIONS_REQUEST_LATENCY.startTimer(); + SlowLogSpan slowLogSpan = backOffer.getSlowLog().start("scanRegions"); + try { + return pdClient.scanRegions(backOffer, startKey, endKey, limit); + } catch (Exception e) { + return new ArrayList<>(); + } finally { + requestTimer.observeDuration(); + slowLogSpan.end(); + } + } + public TiRegion getRegionByKey(ByteString key) { return getRegionByKey(key, defaultBackOff()); } diff --git a/src/test/java/org/tikv/common/TiSessionTest.java b/src/test/java/org/tikv/common/TiSessionTest.java index 908beaad834..7c560e8a35b 100644 --- a/src/test/java/org/tikv/common/TiSessionTest.java +++ b/src/test/java/org/tikv/common/TiSessionTest.java @@ -13,11 +13,14 @@ import java.util.concurrent.atomic.AtomicReference; import org.junit.After; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.tikv.BaseRawKVTest; import org.tikv.common.region.TiRegion; import org.tikv.raw.RawKVClient; public class TiSessionTest extends BaseRawKVTest { + private static final Logger logger = LoggerFactory.getLogger(TiSessionTest.class); private TiSession session; @After @@ -122,4 +125,31 @@ private void doCloseTest(boolean now, long timeoutMS) throws Exception { assertTrue(e.getMessage().contains("rejected from java.util.concurrent.ThreadPoolExecutor")); } } + + @Test + public void warmUpTest() throws Exception { + TiConfiguration conf = createTiConfiguration(); + conf.setWarmUpEnable(true); + long t0 = doTest(conf); + conf.setWarmUpEnable(false); + long t1 = doTest(conf); + assertTrue(t0 < t1); + } + + private long doTest(TiConfiguration conf) throws Exception { + session = TiSession.create(conf); + long start = System.currentTimeMillis(); + try (RawKVClient client = session.createRawClient()) { + client.get(ByteString.EMPTY); + } + long end = System.currentTimeMillis(); + logger.info( + "[warm up " + + (conf.isWarmUpEnable() ? "enabled" : "disabled") + + "] duration " + + (end - start) + + "ms"); + session.close(); + return end - start; + } } From ce5b016e3584b41258d00d8069d6cb7d898f2662 Mon Sep 17 00:00:00 2001 From: birdstorm Date: Tue, 28 Dec 2021 15:16:54 +0800 Subject: [PATCH 2/3] fix Signed-off-by: birdstorm --- src/main/java/org/tikv/common/TiConfiguration.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/main/java/org/tikv/common/TiConfiguration.java b/src/main/java/org/tikv/common/TiConfiguration.java index 0c4c3b3416e..71069f37b37 100644 --- a/src/main/java/org/tikv/common/TiConfiguration.java +++ b/src/main/java/org/tikv/common/TiConfiguration.java @@ -585,6 +585,14 @@ public KVMode getKvMode() { return kvMode; } + public boolean isRawKVMode() { + return getKvMode() == TiConfiguration.KVMode.RAW; + } + + public boolean isTxnKVMode() { + return getKvMode() == KVMode.TXN; + } + public TiConfiguration setKvMode(String kvMode) { this.kvMode = KVMode.valueOf(kvMode); return this; From 2e3a08ddeb31de11551cde89f54ea01d922e0498 Mon Sep 17 00:00:00 2001 From: birdstorm Date: Wed, 29 Dec 2021 15:23:52 +0800 Subject: [PATCH 3/3] merge master Signed-off-by: birdstorm --- src/main/java/org/tikv/common/TiSession.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/main/java/org/tikv/common/TiSession.java b/src/main/java/org/tikv/common/TiSession.java index c081d915fa1..f1f9790945e 100644 --- a/src/main/java/org/tikv/common/TiSession.java +++ b/src/main/java/org/tikv/common/TiSession.java @@ -113,10 +113,7 @@ private synchronized void warmUp() { do { List regions = regionManager.scanRegions( - ConcreteBackOffer.newGetBackOff(), - startKey, - ByteString.EMPTY, - conf.getScanRegionsLimit()); + backOffer, startKey, ByteString.EMPTY, conf.getScanRegionsLimit()); if (regions == null || regions.isEmpty()) { // something went wrong, but the warm-up process could continue break;