From cd1a3539229f78665c1118c9adf1015b5e39b55e Mon Sep 17 00:00:00 2001 From: birdstorm Date: Fri, 10 Dec 2021 04:11:30 +0800 Subject: [PATCH 1/7] use scanRegions request to warm up client Signed-off-by: birdstorm --- .../java/org/tikv/common/ConfigUtils.java | 4 ++++ src/main/java/org/tikv/common/PDClient.java | 19 +++++++++++++++++ .../org/tikv/common/ReadOnlyPDClient.java | 4 ++++ .../java/org/tikv/common/TiConfiguration.java | 11 ++++++++++ src/main/java/org/tikv/common/TiSession.java | 17 ++++++++++----- .../org/tikv/common/region/RegionManager.java | 21 +++++++++++++++++++ 6 files changed, 71 insertions(+), 5 deletions(-) diff --git a/src/main/java/org/tikv/common/ConfigUtils.java b/src/main/java/org/tikv/common/ConfigUtils.java index 940fe646f8a..69014e6fd9e 100644 --- a/src/main/java/org/tikv/common/ConfigUtils.java +++ b/src/main/java/org/tikv/common/ConfigUtils.java @@ -104,6 +104,8 @@ public class ConfigUtils { public static final String TiKV_CIRCUIT_BREAK_ATTEMPT_REQUEST_COUNT = "tikv.circuit_break.trigger.attempt_request_count"; + public static final String TIKV_SCAN_REGIONS_LIMIT = "tikv.scan_regions_limit"; + public static final String TIFLASH_ENABLE = "tiflash.enable"; public static final String DEF_PD_ADDRESSES = "127.0.0.1:2379"; public static final String DEF_TIMEOUT = "200ms"; @@ -180,4 +182,6 @@ public class ConfigUtils { public static final int DEF_TiKV_CIRCUIT_BREAK_AVAILABILITY_REQUST_VOLUMN_THRESHOLD = 10; public static final int DEF_TiKV_CIRCUIT_BREAK_SLEEP_WINDOW_IN_SECONDS = 20; public static final int DEF_TiKV_CIRCUIT_BREAK_ATTEMPT_REQUEST_COUNT = 10; + + public static final int DEF_TIKV_SCAN_REGIONS_LIMIT = 1000; } diff --git a/src/main/java/org/tikv/common/PDClient.java b/src/main/java/org/tikv/common/PDClient.java index a4c3837ca6f..c5e66f4df9d 100644 --- a/src/main/java/org/tikv/common/PDClient.java +++ b/src/main/java/org/tikv/common/PDClient.java @@ -341,6 +341,25 @@ public Pair getRegionByID(BackOffer backOffer, long return new Pair(decodeRegion(resp.getRegion()), resp.getLeader()); } + @Override + public List scanRegions( + BackOffer backOffer, ByteString startKey, ByteString endKey, int limit) { + Supplier request = + () -> + Pdpb.ScanRegionsRequest.newBuilder() + .setHeader(header) + .setStartKey(startKey) + .setEndKey(endKey) + .setLimit(limit) + .build(); + PDErrorHandler handler = + new PDErrorHandler<>(r -> buildFromPdpbError(r.getHeader().getError()), this); + + Pdpb.ScanRegionsResponse resp = + callWithRetry(backOffer, PDGrpc.getScanRegionsMethod(), request, handler); + return resp.getRegionsList(); + } + private Supplier buildGetStoreReq(long storeId) { return () -> GetStoreRequest.newBuilder().setHeader(header).setStoreId(storeId).build(); } diff --git a/src/main/java/org/tikv/common/ReadOnlyPDClient.java b/src/main/java/org/tikv/common/ReadOnlyPDClient.java index 63518b9377b..802d258bc8f 100644 --- a/src/main/java/org/tikv/common/ReadOnlyPDClient.java +++ b/src/main/java/org/tikv/common/ReadOnlyPDClient.java @@ -22,6 +22,7 @@ import org.tikv.common.util.Pair; import org.tikv.kvproto.Metapb; import org.tikv.kvproto.Metapb.Store; +import org.tikv.kvproto.Pdpb; /** Readonly PD client including only reading related interface Supposed for TiDB-like use cases */ public interface ReadOnlyPDClient { @@ -48,6 +49,9 @@ public interface ReadOnlyPDClient { */ Pair getRegionByID(BackOffer backOffer, long id); + List scanRegions( + BackOffer backOffer, ByteString startKey, ByteString endKey, int limit); + HostMapping getHostMapping(); /** diff --git a/src/main/java/org/tikv/common/TiConfiguration.java b/src/main/java/org/tikv/common/TiConfiguration.java index 4cdb48a777f..9cd07704cce 100644 --- a/src/main/java/org/tikv/common/TiConfiguration.java +++ b/src/main/java/org/tikv/common/TiConfiguration.java @@ -144,6 +144,7 @@ private static void loadFromDefaultProperties() { TiKV_CIRCUIT_BREAK_SLEEP_WINDOW_IN_SECONDS, DEF_TiKV_CIRCUIT_BREAK_SLEEP_WINDOW_IN_SECONDS); setIfMissing( TiKV_CIRCUIT_BREAK_ATTEMPT_REQUEST_COUNT, DEF_TiKV_CIRCUIT_BREAK_ATTEMPT_REQUEST_COUNT); + setIfMissing(TIKV_SCAN_REGIONS_LIMIT, DEF_TIKV_SCAN_REGIONS_LIMIT); } public static void listAll() { @@ -384,6 +385,8 @@ private static ReplicaRead getReplicaRead(String key) { private int circuitBreakSleepWindowInSeconds = getInt(TiKV_CIRCUIT_BREAK_SLEEP_WINDOW_IN_SECONDS); private int circuitBreakAttemptRequestCount = getInt(TiKV_CIRCUIT_BREAK_ATTEMPT_REQUEST_COUNT); + private int scanRegionsLimit = getInt(TIKV_SCAN_REGIONS_LIMIT); + public enum KVMode { TXN, RAW @@ -958,4 +961,12 @@ public int getCircuitBreakAttemptRequestCount() { public void setCircuitBreakAttemptRequestCount(int circuitBreakAttemptRequestCount) { this.circuitBreakAttemptRequestCount = circuitBreakAttemptRequestCount; } + + public int getScanRegionsLimit() { + return scanRegionsLimit; + } + + public void setScanRegionsLimit(int scanRegionsLimit) { + this.scanRegionsLimit = scanRegionsLimit; + } } diff --git a/src/main/java/org/tikv/common/TiSession.java b/src/main/java/org/tikv/common/TiSession.java index 6b3298ea40b..2dc8e081369 100644 --- a/src/main/java/org/tikv/common/TiSession.java +++ b/src/main/java/org/tikv/common/TiSession.java @@ -36,6 +36,7 @@ import org.tikv.common.util.*; import org.tikv.kvproto.ImportSstpb; import org.tikv.kvproto.Metapb; +import org.tikv.kvproto.Pdpb; import org.tikv.raw.RawKVClient; import org.tikv.raw.SmartRawKVClient; import org.tikv.txn.KVClient; @@ -109,12 +110,18 @@ private synchronized void warmUp() { null, new TiStore(this.client.getStore(ConcreteBackOffer.newGetBackOff(), store.getId()))); } - ByteString startKey = ByteString.EMPTY; - do { - TiRegion region = regionManager.getRegionByKey(startKey); - startKey = region.getEndKey(); - } while (!startKey.isEmpty()); + // use scan region to load region cache with limit + List regions = + regionManager.scanRegions( + ConcreteBackOffer.newGetBackOff(), + ByteString.EMPTY, + ByteString.EMPTY, + conf.getScanRegionsLimit()); + for (Pdpb.Region region : regions) { + regionManager.insertRegionToCache( + regionManager.createRegion(region.getRegion(), ConcreteBackOffer.newGetBackOff())); + } RawKVClient rawKVClient = createRawClient(); ByteString exampleKey = ByteString.EMPTY; diff --git a/src/main/java/org/tikv/common/region/RegionManager.java b/src/main/java/org/tikv/common/region/RegionManager.java index b9fc08d73fd..0d22f430550 100644 --- a/src/main/java/org/tikv/common/region/RegionManager.java +++ b/src/main/java/org/tikv/common/region/RegionManager.java @@ -21,6 +21,7 @@ import com.google.protobuf.ByteString; import io.prometheus.client.Histogram; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -41,6 +42,7 @@ import org.tikv.kvproto.Metapb; import org.tikv.kvproto.Metapb.Peer; import org.tikv.kvproto.Metapb.StoreState; +import org.tikv.kvproto.Pdpb; @SuppressWarnings("UnstableApiUsage") public class RegionManager { @@ -50,6 +52,11 @@ public class RegionManager { .name("client_java_get_region_by_requests_latency") .help("getRegionByKey request latency.") .register(); + public static final Histogram SCAN_REGIONS_REQUEST_LATENCY = + Histogram.build() + .name("client_java_scan_regions_request_latency") + .help("scanRegions request latency.") + .register(); // TODO: the region cache logic need rewrite. // https://github.com/pingcap/tispark/issues/1170 @@ -95,6 +102,20 @@ public void invalidateAll() { cache.invalidateAll(); } + public List scanRegions( + BackOffer backOffer, ByteString startKey, ByteString endKey, int limit) { + Histogram.Timer requestTimer = SCAN_REGIONS_REQUEST_LATENCY.startTimer(); + SlowLogSpan slowLogSpan = backOffer.getSlowLog().start("scanRegions"); + try { + return pdClient.scanRegions(backOffer, startKey, endKey, limit); + } catch (Exception e) { + return new ArrayList<>(); + } finally { + requestTimer.observeDuration(); + slowLogSpan.end(); + } + } + public TiRegion getRegionByKey(ByteString key) { return getRegionByKey(key, defaultBackOff()); } From d83030171d1d1a924f97fcbb8f38a7e27326998a Mon Sep 17 00:00:00 2001 From: birdstorm Date: Fri, 10 Dec 2021 04:25:10 +0800 Subject: [PATCH 2/7] fetch regions using while loop Signed-off-by: birdstorm --- src/main/java/org/tikv/common/TiSession.java | 24 ++++++++++++-------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/src/main/java/org/tikv/common/TiSession.java b/src/main/java/org/tikv/common/TiSession.java index 2dc8e081369..a854eb7d61f 100644 --- a/src/main/java/org/tikv/common/TiSession.java +++ b/src/main/java/org/tikv/common/TiSession.java @@ -112,16 +112,20 @@ private synchronized void warmUp() { } // use scan region to load region cache with limit - List regions = - regionManager.scanRegions( - ConcreteBackOffer.newGetBackOff(), - ByteString.EMPTY, - ByteString.EMPTY, - conf.getScanRegionsLimit()); - for (Pdpb.Region region : regions) { - regionManager.insertRegionToCache( - regionManager.createRegion(region.getRegion(), ConcreteBackOffer.newGetBackOff())); - } + ByteString startKey = ByteString.EMPTY; + do { + List regions = regionManager.scanRegions( + ConcreteBackOffer.newGetBackOff(), + startKey, + ByteString.EMPTY, + conf.getScanRegionsLimit()); + for (Pdpb.Region region : regions) { + regionManager.insertRegionToCache( + regionManager.createRegion(region.getRegion(), ConcreteBackOffer.newGetBackOff())); + } + startKey = regions.get(regions.size() - 1).getRegion().getEndKey(); + } while (!startKey.isEmpty()); + RawKVClient rawKVClient = createRawClient(); ByteString exampleKey = ByteString.EMPTY; From 57e4be8e511fd6758d5e4a662f1db6f07b998fe4 Mon Sep 17 00:00:00 2001 From: birdstorm Date: Mon, 13 Dec 2021 18:00:14 +0800 Subject: [PATCH 3/7] format Signed-off-by: birdstorm --- src/main/java/org/tikv/common/TiSession.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/main/java/org/tikv/common/TiSession.java b/src/main/java/org/tikv/common/TiSession.java index afad03a3698..54fc02ede2b 100644 --- a/src/main/java/org/tikv/common/TiSession.java +++ b/src/main/java/org/tikv/common/TiSession.java @@ -122,11 +122,12 @@ private synchronized void warmUp() { // use scan region to load region cache with limit ByteString startKey = ByteString.EMPTY; do { - List regions = regionManager.scanRegions( - ConcreteBackOffer.newGetBackOff(), - startKey, - ByteString.EMPTY, - conf.getScanRegionsLimit()); + List regions = + regionManager.scanRegions( + ConcreteBackOffer.newGetBackOff(), + startKey, + ByteString.EMPTY, + conf.getScanRegionsLimit()); for (Pdpb.Region region : regions) { regionManager.insertRegionToCache( regionManager.createRegion(region.getRegion(), ConcreteBackOffer.newGetBackOff())); @@ -134,7 +135,6 @@ private synchronized void warmUp() { startKey = regions.get(regions.size() - 1).getRegion().getEndKey(); } while (!startKey.isEmpty()); - RawKVClient rawKVClient = createRawClient(); ByteString exampleKey = ByteString.EMPTY; Optional prev = rawKVClient.get(exampleKey); From 8046590dffc3bf3d1f729db69b05f0fb90d286ee Mon Sep 17 00:00:00 2001 From: birdstorm Date: Wed, 15 Dec 2021 10:58:40 +0800 Subject: [PATCH 4/7] tiny modification Signed-off-by: birdstorm --- src/main/java/org/tikv/common/PDClient.java | 16 ++++++++-------- src/main/java/org/tikv/common/TiSession.java | 4 ++++ 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/src/main/java/org/tikv/common/PDClient.java b/src/main/java/org/tikv/common/PDClient.java index c5e66f4df9d..d3c5d99de68 100644 --- a/src/main/java/org/tikv/common/PDClient.java +++ b/src/main/java/org/tikv/common/PDClient.java @@ -344,19 +344,19 @@ public Pair getRegionByID(BackOffer backOffer, long @Override public List scanRegions( BackOffer backOffer, ByteString startKey, ByteString endKey, int limit) { - Supplier request = - () -> + // no need to backoff because ScanRegions is just for optimization + PDGrpc.PDBlockingStub stub = getBlockingStub().withDeadlineAfter(getTimeout(), TimeUnit.MILLISECONDS); + Pdpb.ScanRegionsRequest request = Pdpb.ScanRegionsRequest.newBuilder() .setHeader(header) .setStartKey(startKey) .setEndKey(endKey) .setLimit(limit) .build(); - PDErrorHandler handler = - new PDErrorHandler<>(r -> buildFromPdpbError(r.getHeader().getError()), this); - - Pdpb.ScanRegionsResponse resp = - callWithRetry(backOffer, PDGrpc.getScanRegionsMethod(), request, handler); + Pdpb.ScanRegionsResponse resp = stub.scanRegions(request); + if (resp == null) { + return null; + } return resp.getRegionsList(); } @@ -434,7 +434,7 @@ private GetMembersResponse getMembers(URI uri) { PDGrpc.PDBlockingStub stub = PDGrpc.newBlockingStub(probChan).withDeadlineAfter(getTimeout(), TimeUnit.MILLISECONDS); GetMembersRequest request = - GetMembersRequest.newBuilder().setHeader(RequestHeader.getDefaultInstance()).build(); + GetMembersRequest.newBuilder().setHeader(header).build(); GetMembersResponse resp = stub.getMembers(request); // check if the response contains a valid leader if (resp != null && resp.getLeader().getMemberId() == 0) { diff --git a/src/main/java/org/tikv/common/TiSession.java b/src/main/java/org/tikv/common/TiSession.java index 54fc02ede2b..b5ea4c110b9 100644 --- a/src/main/java/org/tikv/common/TiSession.java +++ b/src/main/java/org/tikv/common/TiSession.java @@ -128,6 +128,10 @@ private synchronized void warmUp() { startKey, ByteString.EMPTY, conf.getScanRegionsLimit()); + if (regions == null || regions.isEmpty()) { + // something went wrong, but the warm-up process could continue + break; + } for (Pdpb.Region region : regions) { regionManager.insertRegionToCache( regionManager.createRegion(region.getRegion(), ConcreteBackOffer.newGetBackOff())); From 4c906cd874b57563b84e66d01dca9ec0424c7d53 Mon Sep 17 00:00:00 2001 From: birdstorm Date: Wed, 15 Dec 2021 11:11:00 +0800 Subject: [PATCH 5/7] format Signed-off-by: birdstorm --- .../java/org/tikv/common/ConfigUtils.java | 2 ++ src/main/java/org/tikv/common/PDClient.java | 19 +++++++-------- .../java/org/tikv/common/TiConfiguration.java | 11 +++++++++ src/main/java/org/tikv/common/TiSession.java | 23 ++++++++++--------- 4 files changed, 35 insertions(+), 20 deletions(-) diff --git a/src/main/java/org/tikv/common/ConfigUtils.java b/src/main/java/org/tikv/common/ConfigUtils.java index fd4d67f127f..e21321abe2f 100644 --- a/src/main/java/org/tikv/common/ConfigUtils.java +++ b/src/main/java/org/tikv/common/ConfigUtils.java @@ -25,6 +25,7 @@ public class ConfigUtils { public static final String TIKV_GRPC_TIMEOUT = "tikv.grpc.timeout_in_ms"; public static final String TIKV_GRPC_INGEST_TIMEOUT = "tikv.grpc.ingest_timeout_in_ms"; public static final String TIKV_GRPC_FORWARD_TIMEOUT = "tikv.grpc.forward_timeout_in_ms"; + public static final String TIKV_GRPC_WARM_UP_TIMEOUT = "tikv.grpc.warm_up_timeout_in_ms"; public static final String TIKV_PD_FIRST_GET_MEMBER_TIMEOUT = "tikv.grpc.pd_first_get_member_timeout_in_ms"; public static final String TIKV_GRPC_SCAN_TIMEOUT = "tikv.grpc.scan_timeout_in_ms"; @@ -112,6 +113,7 @@ public class ConfigUtils { public static final String DEF_TIMEOUT = "200ms"; public static final String DEF_TIKV_GRPC_INGEST_TIMEOUT = "200s"; public static final String DEF_FORWARD_TIMEOUT = "300ms"; + public static final String DEF_TIKV_GRPC_WARM_UP_TIMEOUT = "5000ms"; public static final String DEF_TIKV_PD_FIRST_GET_MEMBER_TIMEOUT = "10000ms"; public static final String DEF_SCAN_TIMEOUT = "20s"; public static final int DEF_CHECK_HEALTH_TIMEOUT = 100; diff --git a/src/main/java/org/tikv/common/PDClient.java b/src/main/java/org/tikv/common/PDClient.java index d3c5d99de68..a1fe40027ae 100644 --- a/src/main/java/org/tikv/common/PDClient.java +++ b/src/main/java/org/tikv/common/PDClient.java @@ -345,14 +345,16 @@ public Pair getRegionByID(BackOffer backOffer, long public List scanRegions( BackOffer backOffer, ByteString startKey, ByteString endKey, int limit) { // no need to backoff because ScanRegions is just for optimization - PDGrpc.PDBlockingStub stub = getBlockingStub().withDeadlineAfter(getTimeout(), TimeUnit.MILLISECONDS); + // introduce a warm-up timeout for ScanRegions requests + PDGrpc.PDBlockingStub stub = + getBlockingStub().withDeadlineAfter(conf.getWarmUpTimeout(), TimeUnit.MILLISECONDS); Pdpb.ScanRegionsRequest request = - Pdpb.ScanRegionsRequest.newBuilder() - .setHeader(header) - .setStartKey(startKey) - .setEndKey(endKey) - .setLimit(limit) - .build(); + Pdpb.ScanRegionsRequest.newBuilder() + .setHeader(header) + .setStartKey(startKey) + .setEndKey(endKey) + .setLimit(limit) + .build(); Pdpb.ScanRegionsResponse resp = stub.scanRegions(request); if (resp == null) { return null; @@ -433,8 +435,7 @@ private GetMembersResponse getMembers(URI uri) { ManagedChannel probChan = channelFactory.getChannel(uriToAddr(uri), hostMapping); PDGrpc.PDBlockingStub stub = PDGrpc.newBlockingStub(probChan).withDeadlineAfter(getTimeout(), TimeUnit.MILLISECONDS); - GetMembersRequest request = - GetMembersRequest.newBuilder().setHeader(header).build(); + GetMembersRequest request = GetMembersRequest.newBuilder().setHeader(header).build(); GetMembersResponse resp = stub.getMembers(request); // check if the response contains a valid leader if (resp != null && resp.getLeader().getMemberId() == 0) { diff --git a/src/main/java/org/tikv/common/TiConfiguration.java b/src/main/java/org/tikv/common/TiConfiguration.java index 2347cc4842c..153d6ba7cdb 100644 --- a/src/main/java/org/tikv/common/TiConfiguration.java +++ b/src/main/java/org/tikv/common/TiConfiguration.java @@ -86,6 +86,7 @@ private static void loadFromDefaultProperties() { setIfMissing(TIKV_GRPC_TIMEOUT, DEF_TIMEOUT); setIfMissing(TIKV_GRPC_INGEST_TIMEOUT, DEF_TIKV_GRPC_INGEST_TIMEOUT); setIfMissing(TIKV_GRPC_FORWARD_TIMEOUT, DEF_FORWARD_TIMEOUT); + setIfMissing(TIKV_GRPC_WARM_UP_TIMEOUT, DEF_TIKV_GRPC_WARM_UP_TIMEOUT); setIfMissing(TIKV_PD_FIRST_GET_MEMBER_TIMEOUT, DEF_TIKV_PD_FIRST_GET_MEMBER_TIMEOUT); setIfMissing(TIKV_GRPC_SCAN_TIMEOUT, DEF_SCAN_TIMEOUT); setIfMissing(TIKV_GRPC_SCAN_BATCH_SIZE, DEF_SCAN_BATCH_SIZE); @@ -309,6 +310,7 @@ private static ReplicaRead getReplicaRead(String key) { private long timeout = getTimeAsMs(TIKV_GRPC_TIMEOUT); private long ingestTimeout = getTimeAsMs(TIKV_GRPC_INGEST_TIMEOUT); private long forwardTimeout = getTimeAsMs(TIKV_GRPC_FORWARD_TIMEOUT); + private long warmUpTimeout = getTimeAsMs(TIKV_GRPC_WARM_UP_TIMEOUT); private long pdFirstGetMemberTimeout = getTimeAsMs(TIKV_PD_FIRST_GET_MEMBER_TIMEOUT); private long scanTimeout = getTimeAsMs(TIKV_GRPC_SCAN_TIMEOUT); private int maxFrameSize = getInt(TIKV_GRPC_MAX_FRAME_SIZE); @@ -471,6 +473,15 @@ public TiConfiguration setForwardTimeout(long timeout) { return this; } + public long getWarmUpTimeout() { + return warmUpTimeout; + } + + public TiConfiguration setWarmUpTimeout(long timeout) { + this.warmUpTimeout = timeout; + return this; + } + public long getPdFirstGetMemberTimeout() { return pdFirstGetMemberTimeout; } diff --git a/src/main/java/org/tikv/common/TiSession.java b/src/main/java/org/tikv/common/TiSession.java index b5ea4c110b9..3e4972b4d9f 100644 --- a/src/main/java/org/tikv/common/TiSession.java +++ b/src/main/java/org/tikv/common/TiSession.java @@ -139,17 +139,18 @@ private synchronized void warmUp() { startKey = regions.get(regions.size() - 1).getRegion().getEndKey(); } while (!startKey.isEmpty()); - RawKVClient rawKVClient = createRawClient(); - ByteString exampleKey = ByteString.EMPTY; - Optional prev = rawKVClient.get(exampleKey); - if (prev.isPresent()) { - rawKVClient.delete(exampleKey); - rawKVClient.putIfAbsent(exampleKey, prev.get()); - rawKVClient.put(exampleKey, prev.get()); - } else { - rawKVClient.putIfAbsent(exampleKey, ByteString.EMPTY); - rawKVClient.put(exampleKey, ByteString.EMPTY); - rawKVClient.delete(exampleKey); + try (RawKVClient rawKVClient = createRawClient()) { + ByteString exampleKey = ByteString.EMPTY; + Optional prev = rawKVClient.get(exampleKey); + if (prev.isPresent()) { + rawKVClient.delete(exampleKey); + rawKVClient.putIfAbsent(exampleKey, prev.get()); + rawKVClient.put(exampleKey, prev.get()); + } else { + rawKVClient.putIfAbsent(exampleKey, ByteString.EMPTY); + rawKVClient.put(exampleKey, ByteString.EMPTY); + rawKVClient.delete(exampleKey); + } } } catch (Exception e) { // ignore error From c36ea10a089bb3baf49a6ac4649e1c851a000ad0 Mon Sep 17 00:00:00 2001 From: birdstorm Date: Wed, 15 Dec 2021 12:03:29 +0800 Subject: [PATCH 6/7] add test Signed-off-by: birdstorm --- .../java/org/tikv/common/ConfigUtils.java | 2 ++ src/main/java/org/tikv/common/PDClient.java | 3 +- .../java/org/tikv/common/TiConfiguration.java | 10 +++++++ src/main/java/org/tikv/common/TiSession.java | 4 ++- src/test/java/org/tikv/BaseRawKVTest.java | 1 + .../java/org/tikv/common/TiSessionTest.java | 28 +++++++++++++++++++ 6 files changed, 46 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/tikv/common/ConfigUtils.java b/src/main/java/org/tikv/common/ConfigUtils.java index e21321abe2f..aa6b75b87f9 100644 --- a/src/main/java/org/tikv/common/ConfigUtils.java +++ b/src/main/java/org/tikv/common/ConfigUtils.java @@ -109,6 +109,7 @@ public class ConfigUtils { public static final String TIKV_SCAN_REGIONS_LIMIT = "tikv.scan_regions_limit"; public static final String TIFLASH_ENABLE = "tiflash.enable"; + public static final String TIKV_WARM_UP_ENABLE = "tikv.warm_up.enable"; public static final String DEF_PD_ADDRESSES = "127.0.0.1:2379"; public static final String DEF_TIMEOUT = "200ms"; public static final String DEF_TIKV_GRPC_INGEST_TIMEOUT = "200s"; @@ -179,6 +180,7 @@ public class ConfigUtils { public static final int DEF_TIKV_GRPC_IDLE_TIMEOUT = 60; public static final boolean DEF_TIKV_TLS_ENABLE = false; public static final boolean DEF_TIFLASH_ENABLE = false; + public static final boolean DEF_TIKV_WARM_UP_ENABLE = true; public static final boolean DEF_TiKV_CIRCUIT_BREAK_ENABLE = false; public static final int DEF_TiKV_CIRCUIT_BREAK_AVAILABILITY_WINDOW_IN_SECONDS = 60; diff --git a/src/main/java/org/tikv/common/PDClient.java b/src/main/java/org/tikv/common/PDClient.java index a1fe40027ae..b609c85804f 100644 --- a/src/main/java/org/tikv/common/PDClient.java +++ b/src/main/java/org/tikv/common/PDClient.java @@ -435,7 +435,8 @@ private GetMembersResponse getMembers(URI uri) { ManagedChannel probChan = channelFactory.getChannel(uriToAddr(uri), hostMapping); PDGrpc.PDBlockingStub stub = PDGrpc.newBlockingStub(probChan).withDeadlineAfter(getTimeout(), TimeUnit.MILLISECONDS); - GetMembersRequest request = GetMembersRequest.newBuilder().setHeader(header).build(); + GetMembersRequest request = + GetMembersRequest.newBuilder().setHeader(RequestHeader.getDefaultInstance()).build(); GetMembersResponse resp = stub.getMembers(request); // check if the response contains a valid leader if (resp != null && resp.getLeader().getMemberId() == 0) { diff --git a/src/main/java/org/tikv/common/TiConfiguration.java b/src/main/java/org/tikv/common/TiConfiguration.java index 153d6ba7cdb..e8beea3a653 100644 --- a/src/main/java/org/tikv/common/TiConfiguration.java +++ b/src/main/java/org/tikv/common/TiConfiguration.java @@ -124,6 +124,7 @@ private static void loadFromDefaultProperties() { setIfMissing(TIKV_GRPC_IDLE_TIMEOUT, DEF_TIKV_GRPC_IDLE_TIMEOUT); setIfMissing(TIKV_TLS_ENABLE, DEF_TIKV_TLS_ENABLE); setIfMissing(TIFLASH_ENABLE, DEF_TIFLASH_ENABLE); + setIfMissing(TIKV_WARM_UP_ENABLE, DEF_TIKV_WARM_UP_ENABLE); setIfMissing(TIKV_RAWKV_READ_TIMEOUT_IN_MS, DEF_TIKV_RAWKV_READ_TIMEOUT_IN_MS); setIfMissing(TIKV_RAWKV_WRITE_TIMEOUT_IN_MS, DEF_TIKV_RAWKV_WRITE_TIMEOUT_IN_MS); setIfMissing(TIKV_RAWKV_BATCH_READ_TIMEOUT_IN_MS, DEF_TIKV_RAWKV_BATCH_READ_TIMEOUT_IN_MS); @@ -372,6 +373,7 @@ private static ReplicaRead getReplicaRead(String key) { private String keyFile = getOption(TIKV_KEY_FILE).orElse(null); private boolean tiFlashEnable = getBoolean(TIFLASH_ENABLE); + private boolean warmUpEnable = getBoolean(TIKV_WARM_UP_ENABLE); private boolean isTest = false; @@ -818,6 +820,14 @@ public boolean isTiFlashEnabled() { return tiFlashEnable; } + public boolean isWarmUpEnable() { + return warmUpEnable; + } + + public void setWarmUpEnable(boolean warmUpEnable) { + this.warmUpEnable = warmUpEnable; + } + public boolean isTlsEnable() { return tlsEnable; } diff --git a/src/main/java/org/tikv/common/TiSession.java b/src/main/java/org/tikv/common/TiSession.java index 3e4972b4d9f..619f2e38355 100644 --- a/src/main/java/org/tikv/common/TiSession.java +++ b/src/main/java/org/tikv/common/TiSession.java @@ -101,7 +101,9 @@ public TiSession(TiConfiguration conf) { if (this.enableGrpcForward) { logger.info("enable grpc forward for high available"); } - warmUp(); + if (conf.isWarmUpEnable() && conf.isRawKVMode()) { + warmUp(); + } this.circuitBreaker = new CircuitBreakerImpl(conf); logger.info("TiSession initialized in " + conf.getKvMode() + " mode"); } diff --git a/src/test/java/org/tikv/BaseRawKVTest.java b/src/test/java/org/tikv/BaseRawKVTest.java index b24b981ad72..7b6fa1aefa0 100644 --- a/src/test/java/org/tikv/BaseRawKVTest.java +++ b/src/test/java/org/tikv/BaseRawKVTest.java @@ -12,6 +12,7 @@ protected TiConfiguration createTiConfiguration() { ? TiConfiguration.createRawDefault() : TiConfiguration.createRawDefault(pdAddrsStr); conf.setTest(true); + conf.setEnableAtomicForCAS(true); conf.setEnableGrpcForward(false); return conf; } diff --git a/src/test/java/org/tikv/common/TiSessionTest.java b/src/test/java/org/tikv/common/TiSessionTest.java index 1cbc2ced5bd..62ec6e979d8 100644 --- a/src/test/java/org/tikv/common/TiSessionTest.java +++ b/src/test/java/org/tikv/common/TiSessionTest.java @@ -14,11 +14,14 @@ import org.junit.After; import org.junit.Ignore; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.tikv.BaseRawKVTest; import org.tikv.common.region.TiRegion; import org.tikv.raw.RawKVClient; public class TiSessionTest extends BaseRawKVTest { + private static final Logger logger = LoggerFactory.getLogger(TiSessionTest.class); private TiSession session; @After @@ -122,4 +125,29 @@ private void doCloseTest(boolean now, long timeoutMS) throws Exception { assertTrue(e.getMessage().contains("rejected from java.util.concurrent.ThreadPoolExecutor")); } } + + @Test + public void warmUpTest() throws Exception { + TiConfiguration conf = createTiConfiguration(); + conf.setWarmUpEnable(true); + doTest(conf); + conf.setWarmUpEnable(false); + doTest(conf); + } + + private void doTest(TiConfiguration conf) throws Exception { + session = TiSession.create(conf); + long start = System.currentTimeMillis(); + try (RawKVClient client = session.createRawClient()) { + client.get(ByteString.EMPTY); + } + long end = System.currentTimeMillis(); + logger.info( + "[warm up " + + (conf.isWarmUpEnable() ? "enabled" : "disabled") + + "] duration " + + (end - start) + + "ms"); + session.close(); + } } From f2de748f1bdb0c821c9c3d07b282d67f2ff20856 Mon Sep 17 00:00:00 2001 From: birdstorm Date: Thu, 23 Dec 2021 11:36:39 +0800 Subject: [PATCH 7/7] add assert Signed-off-by: birdstorm --- src/test/java/org/tikv/common/TiSessionTest.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/test/java/org/tikv/common/TiSessionTest.java b/src/test/java/org/tikv/common/TiSessionTest.java index 62ec6e979d8..8599e9510d4 100644 --- a/src/test/java/org/tikv/common/TiSessionTest.java +++ b/src/test/java/org/tikv/common/TiSessionTest.java @@ -130,12 +130,13 @@ private void doCloseTest(boolean now, long timeoutMS) throws Exception { public void warmUpTest() throws Exception { TiConfiguration conf = createTiConfiguration(); conf.setWarmUpEnable(true); - doTest(conf); + long t0 = doTest(conf); conf.setWarmUpEnable(false); - doTest(conf); + long t1 = doTest(conf); + assertTrue(t0 < t1); } - private void doTest(TiConfiguration conf) throws Exception { + private long doTest(TiConfiguration conf) throws Exception { session = TiSession.create(conf); long start = System.currentTimeMillis(); try (RawKVClient client = session.createRawClient()) { @@ -149,5 +150,6 @@ private void doTest(TiConfiguration conf) throws Exception { + (end - start) + "ms"); session.close(); + return end - start; } }