From 1fa267d9e89427fe0e9cc050ef5e7e8cec01ee8a Mon Sep 17 00:00:00 2001 From: Jian Zhang Date: Sat, 11 Dec 2021 20:06:53 +0800 Subject: [PATCH 1/4] cherry pick #390 to release-3.1 Signed-off-by: ti-srebot --- src/main/java/org/tikv/common/TiSession.java | 12 +++++++++++- src/main/java/org/tikv/raw/SmartRawKVClient.java | 6 ++---- src/test/java/org/tikv/raw/SmartRawKVClientTest.java | 8 ++++++++ 3 files changed, 21 insertions(+), 5 deletions(-) diff --git a/src/main/java/org/tikv/common/TiSession.java b/src/main/java/org/tikv/common/TiSession.java index 52960ce6852..d3ba02f0762 100644 --- a/src/main/java/org/tikv/common/TiSession.java +++ b/src/main/java/org/tikv/common/TiSession.java @@ -42,6 +42,8 @@ import org.tikv.kvproto.Metapb; import org.tikv.raw.RawKVClient; import org.tikv.raw.SmartRawKVClient; +import org.tikv.service.failsafe.CircuitBreaker; +import org.tikv.service.failsafe.CircuitBreakerImpl; import org.tikv.txn.KVClient; import org.tikv.txn.TxnKVClient; @@ -69,7 +71,14 @@ public class TiSession implements AutoCloseable { private volatile boolean enableGrpcForward; private volatile RegionStoreClient.RegionStoreClientBuilder clientBuilder; private volatile boolean isClosed = false; +<<<<<<< HEAD private MetricsServer metricsServer; +======= + private volatile SwitchTiKVModeClient switchTiKVModeClient; + private final MetricsServer metricsServer; + private final CircuitBreaker circuitBreaker; + private static final int MAX_SPLIT_REGION_STACK_DEPTH = 6; +>>>>>>> faf1da7... [fix #389] move CircuitBreaker to TiSession (#390) public TiSession(TiConfiguration conf) { // may throw org.tikv.common.MetricsServer - http server not up @@ -84,6 +93,7 @@ public TiSession(TiConfiguration conf) { logger.info("enable grpc forward for high available"); } warmUp(); + this.circuitBreaker = new CircuitBreakerImpl(conf); logger.info("TiSession initialized in " + conf.getKvMode() + " mode"); } @@ -150,7 +160,7 @@ public RawKVClient createRawClient() { public SmartRawKVClient createSmartRawClient() { RawKVClient rawKVClient = createRawClient(); - return new SmartRawKVClient(rawKVClient, getConf()); + return new SmartRawKVClient(rawKVClient, circuitBreaker); } public KVClient createKVClient() { diff --git a/src/main/java/org/tikv/raw/SmartRawKVClient.java b/src/main/java/org/tikv/raw/SmartRawKVClient.java index a9c4f4aa4c0..02e75625fc5 100644 --- a/src/main/java/org/tikv/raw/SmartRawKVClient.java +++ b/src/main/java/org/tikv/raw/SmartRawKVClient.java @@ -22,12 +22,10 @@ import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.tikv.common.TiConfiguration; import org.tikv.common.exception.CircuitBreakerOpenException; import org.tikv.common.util.ScanOption; import org.tikv.kvproto.Kvrpcpb; import org.tikv.service.failsafe.CircuitBreaker; -import org.tikv.service.failsafe.CircuitBreakerImpl; public class SmartRawKVClient implements RawKVClientBase { private static final Logger logger = LoggerFactory.getLogger(SmartRawKVClient.class); @@ -63,9 +61,9 @@ public class SmartRawKVClient implements RawKVClientBase { private final RawKVClientBase client; private final CircuitBreaker circuitBreaker; - public SmartRawKVClient(RawKVClientBase client, TiConfiguration conf) { + public SmartRawKVClient(RawKVClientBase client, CircuitBreaker breaker) { this.client = client; - this.circuitBreaker = new CircuitBreakerImpl(conf); + this.circuitBreaker = breaker; } @Override diff --git a/src/test/java/org/tikv/raw/SmartRawKVClientTest.java b/src/test/java/org/tikv/raw/SmartRawKVClientTest.java index b4c0bc55880..f7be3f69552 100644 --- a/src/test/java/org/tikv/raw/SmartRawKVClientTest.java +++ b/src/test/java/org/tikv/raw/SmartRawKVClientTest.java @@ -27,6 +27,7 @@ public class SmartRawKVClientTest { public void setup() { TiConfiguration conf = TiConfiguration.createRawDefault(); conf.setCircuitBreakEnable(enable); + conf.setEnableAtomicForCAS(enable); conf.setCircuitBreakAvailabilityWindowInSeconds(windowInSeconds); conf.setCircuitBreakAvailabilityErrorThresholdPercentage(errorThresholdPercentage); conf.setCircuitBreakAvailabilityRequestVolumnThreshold(requestVolumeThreshold); @@ -72,6 +73,13 @@ public void testCircuitBreaker() throws InterruptedException { } } + @Test + public void testMultiClients() throws InterruptedException { + for (int i = 0; i < 10240; i++) { + client = session.createSmartRawClient(); + } + } + private void success() { client.get(ByteString.copyFromUtf8("key")); } From a51c7fced89abc4f8cdf0ead2f093f258b02f0c1 Mon Sep 17 00:00:00 2001 From: Jian Zhang Date: Sat, 11 Dec 2021 20:09:37 +0800 Subject: [PATCH 2/4] resolve conflicts Signed-off-by: Jian Zhang --- src/main/java/org/tikv/common/TiSession.java | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/main/java/org/tikv/common/TiSession.java b/src/main/java/org/tikv/common/TiSession.java index d3ba02f0762..0cc4b3329d3 100644 --- a/src/main/java/org/tikv/common/TiSession.java +++ b/src/main/java/org/tikv/common/TiSession.java @@ -71,14 +71,8 @@ public class TiSession implements AutoCloseable { private volatile boolean enableGrpcForward; private volatile RegionStoreClient.RegionStoreClientBuilder clientBuilder; private volatile boolean isClosed = false; -<<<<<<< HEAD private MetricsServer metricsServer; -======= - private volatile SwitchTiKVModeClient switchTiKVModeClient; - private final MetricsServer metricsServer; private final CircuitBreaker circuitBreaker; - private static final int MAX_SPLIT_REGION_STACK_DEPTH = 6; ->>>>>>> faf1da7... [fix #389] move CircuitBreaker to TiSession (#390) public TiSession(TiConfiguration conf) { // may throw org.tikv.common.MetricsServer - http server not up From cd71722be40b86fbf906952244e412764edfa154 Mon Sep 17 00:00:00 2001 From: marsishandsome Date: Sat, 11 Dec 2021 20:13:14 +0800 Subject: [PATCH 3/4] remote throws InterruptedException Signed-off-by: marsishandsome --- src/test/java/org/tikv/raw/SmartRawKVClientTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/org/tikv/raw/SmartRawKVClientTest.java b/src/test/java/org/tikv/raw/SmartRawKVClientTest.java index f7be3f69552..2d2c335d36d 100644 --- a/src/test/java/org/tikv/raw/SmartRawKVClientTest.java +++ b/src/test/java/org/tikv/raw/SmartRawKVClientTest.java @@ -74,7 +74,7 @@ public void testCircuitBreaker() throws InterruptedException { } @Test - public void testMultiClients() throws InterruptedException { + public void testMultiClients() { for (int i = 0; i < 10240; i++) { client = session.createSmartRawClient(); } From 421bacb35a5a8ab449616b7cf66e6b58fb7c661b Mon Sep 17 00:00:00 2001 From: marsishandsome Date: Sat, 11 Dec 2021 20:17:56 +0800 Subject: [PATCH 4/4] fix test Signed-off-by: marsishandsome --- src/main/java/org/tikv/common/ConfigUtils.java | 2 +- src/test/java/org/tikv/raw/SmartRawKVClientTest.java | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/src/main/java/org/tikv/common/ConfigUtils.java b/src/main/java/org/tikv/common/ConfigUtils.java index 507db0228b4..ff2d8664551 100644 --- a/src/main/java/org/tikv/common/ConfigUtils.java +++ b/src/main/java/org/tikv/common/ConfigUtils.java @@ -28,7 +28,7 @@ public class ConfigUtils { public static final String TIKV_GRPC_SCAN_BATCH_SIZE = "tikv.grpc.scan_batch_size"; public static final String TIKV_GRPC_MAX_FRAME_SIZE = "tikv.grpc.max_frame_size"; public static final String TIKV_GRPC_IDLE_TIMEOUT = "tikv.grpc.idle_timeout"; - + public static final String TIKV_INDEX_SCAN_BATCH_SIZE = "tikv.index.scan_batch_size"; public static final String TIKV_INDEX_SCAN_CONCURRENCY = "tikv.index.scan_concurrency"; public static final String TIKV_TABLE_SCAN_CONCURRENCY = "tikv.table.scan_concurrency"; diff --git a/src/test/java/org/tikv/raw/SmartRawKVClientTest.java b/src/test/java/org/tikv/raw/SmartRawKVClientTest.java index 2d2c335d36d..6249f84e95e 100644 --- a/src/test/java/org/tikv/raw/SmartRawKVClientTest.java +++ b/src/test/java/org/tikv/raw/SmartRawKVClientTest.java @@ -27,7 +27,6 @@ public class SmartRawKVClientTest { public void setup() { TiConfiguration conf = TiConfiguration.createRawDefault(); conf.setCircuitBreakEnable(enable); - conf.setEnableAtomicForCAS(enable); conf.setCircuitBreakAvailabilityWindowInSeconds(windowInSeconds); conf.setCircuitBreakAvailabilityErrorThresholdPercentage(errorThresholdPercentage); conf.setCircuitBreakAvailabilityRequestVolumnThreshold(requestVolumeThreshold);