Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/main/java/org/tikv/common/ConfigUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class ConfigUtils {
public static final String TIKV_GRPC_SCAN_BATCH_SIZE = "tikv.grpc.scan_batch_size";
public static final String TIKV_GRPC_MAX_FRAME_SIZE = "tikv.grpc.max_frame_size";
public static final String TIKV_GRPC_IDLE_TIMEOUT = "tikv.grpc.idle_timeout";

public static final String TIKV_INDEX_SCAN_BATCH_SIZE = "tikv.index.scan_batch_size";
public static final String TIKV_INDEX_SCAN_CONCURRENCY = "tikv.index.scan_concurrency";
public static final String TIKV_TABLE_SCAN_CONCURRENCY = "tikv.table.scan_concurrency";
Expand Down
6 changes: 5 additions & 1 deletion src/main/java/org/tikv/common/TiSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import org.tikv.kvproto.Metapb;
import org.tikv.raw.RawKVClient;
import org.tikv.raw.SmartRawKVClient;
import org.tikv.service.failsafe.CircuitBreaker;
import org.tikv.service.failsafe.CircuitBreakerImpl;
import org.tikv.txn.KVClient;
import org.tikv.txn.TxnKVClient;

Expand Down Expand Up @@ -70,6 +72,7 @@ public class TiSession implements AutoCloseable {
private volatile RegionStoreClient.RegionStoreClientBuilder clientBuilder;
private volatile boolean isClosed = false;
private MetricsServer metricsServer;
private final CircuitBreaker circuitBreaker;

public TiSession(TiConfiguration conf) {
// may throw org.tikv.common.MetricsServer - http server not up
Expand All @@ -84,6 +87,7 @@ public TiSession(TiConfiguration conf) {
logger.info("enable grpc forward for high available");
}
warmUp();
this.circuitBreaker = new CircuitBreakerImpl(conf);
logger.info("TiSession initialized in " + conf.getKvMode() + " mode");
}

Expand Down Expand Up @@ -150,7 +154,7 @@ public RawKVClient createRawClient() {

public SmartRawKVClient createSmartRawClient() {
RawKVClient rawKVClient = createRawClient();
return new SmartRawKVClient(rawKVClient, getConf());
return new SmartRawKVClient(rawKVClient, circuitBreaker);
}

public KVClient createKVClient() {
Expand Down
6 changes: 2 additions & 4 deletions src/main/java/org/tikv/raw/SmartRawKVClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,10 @@
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.TiConfiguration;
import org.tikv.common.exception.CircuitBreakerOpenException;
import org.tikv.common.util.ScanOption;
import org.tikv.kvproto.Kvrpcpb;
import org.tikv.service.failsafe.CircuitBreaker;
import org.tikv.service.failsafe.CircuitBreakerImpl;

public class SmartRawKVClient implements RawKVClientBase {
private static final Logger logger = LoggerFactory.getLogger(SmartRawKVClient.class);
Expand Down Expand Up @@ -63,9 +61,9 @@ public class SmartRawKVClient implements RawKVClientBase {
private final RawKVClientBase client;
private final CircuitBreaker circuitBreaker;

public SmartRawKVClient(RawKVClientBase client, TiConfiguration conf) {
public SmartRawKVClient(RawKVClientBase client, CircuitBreaker breaker) {
this.client = client;
this.circuitBreaker = new CircuitBreakerImpl(conf);
this.circuitBreaker = breaker;
}

@Override
Expand Down
7 changes: 7 additions & 0 deletions src/test/java/org/tikv/raw/SmartRawKVClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,13 @@ public void testCircuitBreaker() throws InterruptedException {
}
}

@Test
public void testMultiClients() {
for (int i = 0; i < 10240; i++) {
client = session.createSmartRawClient();
}
}

private void success() {
client.get(ByteString.copyFromUtf8("key"));
}
Expand Down