Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/main/java/org/tikv/common/TiSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import org.tikv.kvproto.Metapb;
import org.tikv.raw.RawKVClient;
import org.tikv.raw.SmartRawKVClient;
import org.tikv.service.failsafe.CircuitBreaker;
import org.tikv.service.failsafe.CircuitBreakerImpl;
import org.tikv.txn.KVClient;
import org.tikv.txn.TxnKVClient;

Expand Down Expand Up @@ -68,6 +70,7 @@ public class TiSession implements AutoCloseable {
private volatile boolean isClosed = false;
private volatile SwitchTiKVModeClient switchTiKVModeClient;
private final MetricsServer metricsServer;
private final CircuitBreaker circuitBreaker;
private static final int MAX_SPLIT_REGION_STACK_DEPTH = 6;

public TiSession(TiConfiguration conf) {
Expand Down Expand Up @@ -98,6 +101,7 @@ public TiSession(TiConfiguration conf) {
logger.info("enable grpc forward for high available");
}
warmUp();
this.circuitBreaker = new CircuitBreakerImpl(conf);
logger.info("TiSession initialized in " + conf.getKvMode() + " mode");
}

Expand Down Expand Up @@ -168,7 +172,7 @@ public RawKVClient createRawClient() {

public SmartRawKVClient createSmartRawClient() {
RawKVClient rawKVClient = createRawClient();
return new SmartRawKVClient(rawKVClient, getConf());
return new SmartRawKVClient(rawKVClient, circuitBreaker);
}

public KVClient createKVClient() {
Expand Down
6 changes: 2 additions & 4 deletions src/main/java/org/tikv/raw/SmartRawKVClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,11 @@
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.TiConfiguration;
import org.tikv.common.exception.CircuitBreakerOpenException;
import org.tikv.common.util.Pair;
import org.tikv.common.util.ScanOption;
import org.tikv.kvproto.Kvrpcpb;
import org.tikv.service.failsafe.CircuitBreaker;
import org.tikv.service.failsafe.CircuitBreakerImpl;

public class SmartRawKVClient implements RawKVClientBase {
private static final Logger logger = LoggerFactory.getLogger(SmartRawKVClient.class);
Expand Down Expand Up @@ -65,9 +63,9 @@ public class SmartRawKVClient implements RawKVClientBase {
private final RawKVClientBase client;
private final CircuitBreaker circuitBreaker;

public SmartRawKVClient(RawKVClientBase client, TiConfiguration conf) {
public SmartRawKVClient(RawKVClientBase client, CircuitBreaker breaker) {
this.client = client;
this.circuitBreaker = new CircuitBreakerImpl(conf);
this.circuitBreaker = breaker;
}

@Override
Expand Down
8 changes: 8 additions & 0 deletions src/test/java/org/tikv/raw/SmartRawKVClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public class SmartRawKVClientTest extends BaseRawKVTest {
public void setup() {
TiConfiguration conf = createTiConfiguration();
conf.setCircuitBreakEnable(enable);
conf.setEnableAtomicForCAS(enable);
conf.setCircuitBreakAvailabilityWindowInSeconds(windowInSeconds);
conf.setCircuitBreakAvailabilityErrorThresholdPercentage(errorThresholdPercentage);
conf.setCircuitBreakAvailabilityRequestVolumnThreshold(requestVolumeThreshold);
Expand Down Expand Up @@ -73,6 +74,13 @@ public void testCircuitBreaker() throws InterruptedException {
}
}

@Test
public void testMultiClients() throws InterruptedException {
for (int i = 0; i < 10240; i++) {
client = session.createSmartRawClient();
}
}

private void success() {
client.get(ByteString.copyFromUtf8("key"));
}
Expand Down