diff --git a/src/main/java/org/tikv/common/TiSession.java b/src/main/java/org/tikv/common/TiSession.java index 58377b88807..c9d1aec0d1c 100644 --- a/src/main/java/org/tikv/common/TiSession.java +++ b/src/main/java/org/tikv/common/TiSession.java @@ -38,6 +38,8 @@ import org.tikv.kvproto.Metapb; import org.tikv.raw.RawKVClient; import org.tikv.raw.SmartRawKVClient; +import org.tikv.service.failsafe.CircuitBreaker; +import org.tikv.service.failsafe.CircuitBreakerImpl; import org.tikv.txn.KVClient; import org.tikv.txn.TxnKVClient; @@ -68,6 +70,7 @@ public class TiSession implements AutoCloseable { private volatile boolean isClosed = false; private volatile SwitchTiKVModeClient switchTiKVModeClient; private final MetricsServer metricsServer; + private final CircuitBreaker circuitBreaker; private static final int MAX_SPLIT_REGION_STACK_DEPTH = 6; public TiSession(TiConfiguration conf) { @@ -98,6 +101,7 @@ public TiSession(TiConfiguration conf) { logger.info("enable grpc forward for high available"); } warmUp(); + this.circuitBreaker = new CircuitBreakerImpl(conf); logger.info("TiSession initialized in " + conf.getKvMode() + " mode"); } @@ -168,7 +172,7 @@ public RawKVClient createRawClient() { public SmartRawKVClient createSmartRawClient() { RawKVClient rawKVClient = createRawClient(); - return new SmartRawKVClient(rawKVClient, getConf()); + return new SmartRawKVClient(rawKVClient, circuitBreaker); } public KVClient createKVClient() { diff --git a/src/main/java/org/tikv/raw/SmartRawKVClient.java b/src/main/java/org/tikv/raw/SmartRawKVClient.java index 33a8981e81b..2c00b388e0d 100644 --- a/src/main/java/org/tikv/raw/SmartRawKVClient.java +++ b/src/main/java/org/tikv/raw/SmartRawKVClient.java @@ -23,13 +23,11 @@ import java.util.Optional; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.tikv.common.TiConfiguration; import org.tikv.common.exception.CircuitBreakerOpenException; import org.tikv.common.util.Pair; import org.tikv.common.util.ScanOption; import org.tikv.kvproto.Kvrpcpb; import org.tikv.service.failsafe.CircuitBreaker; -import org.tikv.service.failsafe.CircuitBreakerImpl; public class SmartRawKVClient implements RawKVClientBase { private static final Logger logger = LoggerFactory.getLogger(SmartRawKVClient.class); @@ -65,9 +63,9 @@ public class SmartRawKVClient implements RawKVClientBase { private final RawKVClientBase client; private final CircuitBreaker circuitBreaker; - public SmartRawKVClient(RawKVClientBase client, TiConfiguration conf) { + public SmartRawKVClient(RawKVClientBase client, CircuitBreaker breaker) { this.client = client; - this.circuitBreaker = new CircuitBreakerImpl(conf); + this.circuitBreaker = breaker; } @Override diff --git a/src/test/java/org/tikv/raw/SmartRawKVClientTest.java b/src/test/java/org/tikv/raw/SmartRawKVClientTest.java index 61bb70d6507..bd4dd67cf18 100644 --- a/src/test/java/org/tikv/raw/SmartRawKVClientTest.java +++ b/src/test/java/org/tikv/raw/SmartRawKVClientTest.java @@ -28,6 +28,7 @@ public class SmartRawKVClientTest extends BaseRawKVTest { public void setup() { TiConfiguration conf = createTiConfiguration(); conf.setCircuitBreakEnable(enable); + conf.setEnableAtomicForCAS(enable); conf.setCircuitBreakAvailabilityWindowInSeconds(windowInSeconds); conf.setCircuitBreakAvailabilityErrorThresholdPercentage(errorThresholdPercentage); conf.setCircuitBreakAvailabilityRequestVolumnThreshold(requestVolumeThreshold); @@ -73,6 +74,13 @@ public void testCircuitBreaker() throws InterruptedException { } } + @Test + public void testMultiClients() throws InterruptedException { + for (int i = 0; i < 10240; i++) { + client = session.createSmartRawClient(); + } + } + private void success() { client.get(ByteString.copyFromUtf8("key")); }