Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/main/java/org/tikv/common/ConfigUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public class ConfigUtils {
public static final String TIKV_PD_ADDRESSES = "tikv.pd.addresses";
public static final String TIKV_GRPC_TIMEOUT = "tikv.grpc.timeout_in_ms";
public static final String TIKV_GRPC_FORWARD_TIMEOUT = "tikv.grpc.forward_timeout_in_ms";
public static final String TIKV_GRPC_WARM_UP_TIMEOUT = "tikv.grpc.warm_up_timeout_in_ms";
public static final String TIKV_GRPC_SCAN_TIMEOUT = "tikv.grpc.scan_timeout_in_ms";
public static final String TIKV_GRPC_SCAN_BATCH_SIZE = "tikv.grpc.scan_batch_size";
public static final String TIKV_GRPC_MAX_FRAME_SIZE = "tikv.grpc.max_frame_size";
Expand Down Expand Up @@ -88,10 +89,14 @@ public class ConfigUtils {
public static final String TiKV_CIRCUIT_BREAK_ATTEMPT_REQUEST_COUNT =
"tikv.circuit_break.trigger.attempt_request_count";

public static final String TIKV_SCAN_REGIONS_LIMIT = "tikv.scan_regions_limit";

public static final String TIFLASH_ENABLE = "tiflash.enable";
public static final String TIKV_WARM_UP_ENABLE = "tikv.warm_up.enable";
public static final String DEF_PD_ADDRESSES = "127.0.0.1:2379";
public static final String DEF_TIMEOUT = "200ms";
public static final String DEF_FORWARD_TIMEOUT = "300ms";
public static final String DEF_TIKV_GRPC_WARM_UP_TIMEOUT = "5000ms";
public static final String DEF_SCAN_TIMEOUT = "20s";
public static final int DEF_CHECK_HEALTH_TIMEOUT = 100;
public static final int DEF_HEALTH_CHECK_PERIOD_DURATION = 300;
Expand Down Expand Up @@ -148,11 +153,14 @@ public class ConfigUtils {
public static final String LEADER_AND_FOLLOWER = "LEADER_AND_FOLLOWER";

public static final int DEF_TIKV_GRPC_IDLE_TIMEOUT = 60;
public static final boolean DEF_TIKV_WARM_UP_ENABLE = true;

public static final boolean DEF_TiKV_CIRCUIT_BREAK_ENABLE = false;
public static final int DEF_TiKV_CIRCUIT_BREAK_AVAILABILITY_WINDOW_IN_SECONDS = 60;
public static final int DEF_TiKV_CIRCUIT_BREAK_AVAILABILITY_ERROR_THRESHOLD_PERCENTAGE = 100;
public static final int DEF_TiKV_CIRCUIT_BREAK_AVAILABILITY_REQUST_VOLUMN_THRESHOLD = 10;
public static final int DEF_TiKV_CIRCUIT_BREAK_SLEEP_WINDOW_IN_SECONDS = 20;
public static final int DEF_TiKV_CIRCUIT_BREAK_ATTEMPT_REQUEST_COUNT = 10;

public static final int DEF_TIKV_SCAN_REGIONS_LIMIT = 1000;
}
21 changes: 21 additions & 0 deletions src/main/java/org/tikv/common/PDClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,27 @@ public Pair<Metapb.Region, Metapb.Peer> getRegionByID(BackOffer backOffer, long
return new Pair<Metapb.Region, Metapb.Peer>(decodeRegion(resp.getRegion()), resp.getLeader());
}

@Override
public List<Pdpb.Region> scanRegions(
BackOffer backOffer, ByteString startKey, ByteString endKey, int limit) {
// no need to backoff because ScanRegions is just for optimization
// introduce a warm-up timeout for ScanRegions requests
PDGrpc.PDBlockingStub stub =
getBlockingStub().withDeadlineAfter(conf.getWarmUpTimeout(), TimeUnit.MILLISECONDS);
Pdpb.ScanRegionsRequest request =
Pdpb.ScanRegionsRequest.newBuilder()
.setHeader(header)
.setStartKey(startKey)
.setEndKey(endKey)
.setLimit(limit)
.build();
Pdpb.ScanRegionsResponse resp = stub.scanRegions(request);
if (resp == null) {
return null;
}
return resp.getRegionsList();
}

private Supplier<GetStoreRequest> buildGetStoreReq(long storeId) {
return () -> GetStoreRequest.newBuilder().setHeader(header).setStoreId(storeId).build();
}
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/org/tikv/common/ReadOnlyPDClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.tikv.common.util.Pair;
import org.tikv.kvproto.Metapb;
import org.tikv.kvproto.Metapb.Store;
import org.tikv.kvproto.Pdpb;

/** Readonly PD client including only reading related interface Supposed for TiDB-like use cases */
public interface ReadOnlyPDClient {
Expand All @@ -48,6 +49,9 @@ public interface ReadOnlyPDClient {
*/
Pair<Metapb.Region, Metapb.Peer> getRegionByID(BackOffer backOffer, long id);

List<Pdpb.Region> scanRegions(
BackOffer backOffer, ByteString startKey, ByteString endKey, int limit);

HostMapping getHostMapping();

/**
Expand Down
40 changes: 40 additions & 0 deletions src/main/java/org/tikv/common/TiConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ private static void loadFromDefaultProperties() {
setIfMissing(TIKV_PD_ADDRESSES, DEF_PD_ADDRESSES);
setIfMissing(TIKV_GRPC_TIMEOUT, DEF_TIMEOUT);
setIfMissing(TIKV_GRPC_FORWARD_TIMEOUT, DEF_FORWARD_TIMEOUT);
setIfMissing(TIKV_GRPC_WARM_UP_TIMEOUT, DEF_TIKV_GRPC_WARM_UP_TIMEOUT);
setIfMissing(TIKV_GRPC_SCAN_TIMEOUT, DEF_SCAN_TIMEOUT);
setIfMissing(TIKV_GRPC_SCAN_BATCH_SIZE, DEF_SCAN_BATCH_SIZE);
setIfMissing(TIKV_GRPC_MAX_FRAME_SIZE, DEF_MAX_FRAME_SIZE);
Expand Down Expand Up @@ -113,6 +114,7 @@ private static void loadFromDefaultProperties() {
setIfMissing(TIKV_HEALTH_CHECK_PERIOD_DURATION, DEF_HEALTH_CHECK_PERIOD_DURATION);
setIfMissing(TIKV_RAWKV_DEFAULT_BACKOFF_IN_MS, DEF_TIKV_RAWKV_DEFAULT_BACKOFF_IN_MS);
setIfMissing(TIKV_GRPC_IDLE_TIMEOUT, DEF_TIKV_GRPC_IDLE_TIMEOUT);
setIfMissing(TIKV_WARM_UP_ENABLE, DEF_TIKV_WARM_UP_ENABLE);
setIfMissing(TIKV_RAWKV_READ_TIMEOUT_IN_MS, DEF_TIKV_RAWKV_READ_TIMEOUT_IN_MS);
setIfMissing(TIKV_RAWKV_WRITE_TIMEOUT_IN_MS, DEF_TIKV_RAWKV_WRITE_TIMEOUT_IN_MS);
setIfMissing(TIKV_RAWKV_BATCH_READ_TIMEOUT_IN_MS, DEF_TIKV_RAWKV_BATCH_READ_TIMEOUT_IN_MS);
Expand All @@ -135,6 +137,7 @@ private static void loadFromDefaultProperties() {
TiKV_CIRCUIT_BREAK_SLEEP_WINDOW_IN_SECONDS, DEF_TiKV_CIRCUIT_BREAK_SLEEP_WINDOW_IN_SECONDS);
setIfMissing(
TiKV_CIRCUIT_BREAK_ATTEMPT_REQUEST_COUNT, DEF_TiKV_CIRCUIT_BREAK_ATTEMPT_REQUEST_COUNT);
setIfMissing(TIKV_SCAN_REGIONS_LIMIT, DEF_TIKV_SCAN_REGIONS_LIMIT);
}

public static void listAll() {
Expand Down Expand Up @@ -297,6 +300,7 @@ private static ReplicaRead getReplicaRead(String key) {

private long timeout = getTimeAsMs(TIKV_GRPC_TIMEOUT);
private long forwardTimeout = getTimeAsMs(TIKV_GRPC_FORWARD_TIMEOUT);
private long warmUpTimeout = getTimeAsMs(TIKV_GRPC_WARM_UP_TIMEOUT);
private long scanTimeout = getTimeAsMs(TIKV_GRPC_SCAN_TIMEOUT);
private int maxFrameSize = getInt(TIKV_GRPC_MAX_FRAME_SIZE);
private List<URI> pdAddrs = getPdAddrs(TIKV_PD_ADDRESSES);
Expand Down Expand Up @@ -342,6 +346,7 @@ private static ReplicaRead getReplicaRead(String key) {
private Integer rawKVBatchWriteSlowLogInMS =
getIntOption(TIKV_RAWKV_BATCH_WRITE_SLOWLOG_IN_MS).orElse(null);
private int rawKVScanSlowLogInMS = getInt(TIKV_RAWKV_SCAN_SLOWLOG_IN_MS);
private boolean warmUpEnable = getBoolean(TIKV_WARM_UP_ENABLE);
private int idleTimeout = getInt(TIKV_GRPC_IDLE_TIMEOUT);
private boolean circuitBreakEnable = getBoolean(TiKV_CIRCUIT_BREAK_ENABLE);
private int circuitBreakAvailabilityWindowInSeconds =
Expand All @@ -353,6 +358,8 @@ private static ReplicaRead getReplicaRead(String key) {
private int circuitBreakSleepWindowInSeconds = getInt(TiKV_CIRCUIT_BREAK_SLEEP_WINDOW_IN_SECONDS);
private int circuitBreakAttemptRequestCount = getInt(TiKV_CIRCUIT_BREAK_ATTEMPT_REQUEST_COUNT);

private int scanRegionsLimit = getInt(TIKV_SCAN_REGIONS_LIMIT);

public enum KVMode {
TXN,
RAW
Expand Down Expand Up @@ -427,6 +434,15 @@ public TiConfiguration setForwardTimeout(long timeout) {
return this;
}

public long getWarmUpTimeout() {
return warmUpTimeout;
}

public TiConfiguration setWarmUpTimeout(long timeout) {
this.warmUpTimeout = timeout;
return this;
}

public long getScanTimeout() {
return scanTimeout;
}
Expand Down Expand Up @@ -569,6 +585,14 @@ public KVMode getKvMode() {
return kvMode;
}

public boolean isRawKVMode() {
return getKvMode() == TiConfiguration.KVMode.RAW;
}

public boolean isTxnKVMode() {
return getKvMode() == KVMode.TXN;
}

public TiConfiguration setKvMode(String kvMode) {
this.kvMode = KVMode.valueOf(kvMode);
return this;
Expand Down Expand Up @@ -679,6 +703,14 @@ public void setIdleTimeout(int timeout) {
this.idleTimeout = timeout;
}

public boolean isWarmUpEnable() {
return warmUpEnable;
}

public void setWarmUpEnable(boolean warmUpEnable) {
this.warmUpEnable = warmUpEnable;
}

public int getRawKVReadTimeoutInMS() {
return rawKVReadTimeoutInMS;
}
Expand Down Expand Up @@ -819,4 +851,12 @@ public int getCircuitBreakAttemptRequestCount() {
public void setCircuitBreakAttemptRequestCount(int circuitBreakAttemptRequestCount) {
this.circuitBreakAttemptRequestCount = circuitBreakAttemptRequestCount;
}

public int getScanRegionsLimit() {
return scanRegionsLimit;
}

public void setScanRegionsLimit(int scanRegionsLimit) {
this.scanRegionsLimit = scanRegionsLimit;
}
}
41 changes: 31 additions & 10 deletions src/main/java/org/tikv/common/TiSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.tikv.common.region.TiStore;
import org.tikv.common.util.*;
import org.tikv.kvproto.Metapb;
import org.tikv.kvproto.Pdpb;
import org.tikv.raw.RawKVClient;
import org.tikv.raw.SmartRawKVClient;
import org.tikv.service.failsafe.CircuitBreaker;
Expand Down Expand Up @@ -86,7 +87,9 @@ public TiSession(TiConfiguration conf) {
if (this.enableGrpcForward) {
logger.info("enable grpc forward for high available");
}
warmUp();
if (conf.isWarmUpEnable() && conf.isRawKVMode()) {
warmUp();
}
this.circuitBreaker = new CircuitBreakerImpl(conf);
logger.info("TiSession initialized in " + conf.getKvMode() + " mode");
}
Expand All @@ -104,19 +107,37 @@ private synchronized void warmUp() {
this.regionManager.updateStore(
null, new TiStore(this.client.getStore(backOffer, store.getId())));
}
ByteString startKey = ByteString.EMPTY;

// use scan region to load region cache with limit
ByteString startKey = ByteString.EMPTY;
do {
TiRegion region = regionManager.getRegionByKey(startKey, backOffer);
startKey = region.getEndKey();
List<Pdpb.Region> regions =
regionManager.scanRegions(
backOffer, startKey, ByteString.EMPTY, conf.getScanRegionsLimit());
if (regions == null || regions.isEmpty()) {
// something went wrong, but the warm-up process could continue
break;
}
for (Pdpb.Region region : regions) {
regionManager.insertRegionToCache(
regionManager.createRegion(region.getRegion(), ConcreteBackOffer.newGetBackOff()));
}
startKey = regions.get(regions.size() - 1).getRegion().getEndKey();
} while (!startKey.isEmpty());

RawKVClient rawKVClient = createRawClient();
ByteString exampleKey = ByteString.EMPTY;
ByteString prev = rawKVClient.get(exampleKey);
rawKVClient.delete(exampleKey);
rawKVClient.putIfAbsent(exampleKey, prev);
rawKVClient.put(exampleKey, prev);
try (RawKVClient rawKVClient = createRawClient()) {
ByteString exampleKey = ByteString.EMPTY;
ByteString prev = rawKVClient.get(exampleKey);
if (prev != null) {
rawKVClient.delete(exampleKey);
rawKVClient.putIfAbsent(exampleKey, prev);
rawKVClient.put(exampleKey, prev);
} else {
rawKVClient.putIfAbsent(exampleKey, ByteString.EMPTY);
rawKVClient.put(exampleKey, ByteString.EMPTY);
rawKVClient.delete(exampleKey);
}
}
} catch (Exception e) {
// ignore error
logger.info("warm up fails, ignored ", e);
Expand Down
21 changes: 21 additions & 0 deletions src/main/java/org/tikv/common/region/RegionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.protobuf.ByteString;
import io.prometheus.client.Histogram;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -41,6 +42,7 @@
import org.tikv.kvproto.Metapb;
import org.tikv.kvproto.Metapb.Peer;
import org.tikv.kvproto.Metapb.StoreState;
import org.tikv.kvproto.Pdpb;

@SuppressWarnings("UnstableApiUsage")
public class RegionManager {
Expand All @@ -50,6 +52,11 @@ public class RegionManager {
.name("client_java_get_region_by_requests_latency")
.help("getRegionByKey request latency.")
.register();
public static final Histogram SCAN_REGIONS_REQUEST_LATENCY =
Histogram.build()
.name("client_java_scan_regions_request_latency")
.help("scanRegions request latency.")
.register();

// TODO: the region cache logic need rewrite.
// https://github.com/pingcap/tispark/issues/1170
Expand Down Expand Up @@ -91,6 +98,20 @@ public ReadOnlyPDClient getPDClient() {
return this.pdClient;
}

public List<Pdpb.Region> scanRegions(
BackOffer backOffer, ByteString startKey, ByteString endKey, int limit) {
Histogram.Timer requestTimer = SCAN_REGIONS_REQUEST_LATENCY.startTimer();
SlowLogSpan slowLogSpan = backOffer.getSlowLog().start("scanRegions");
try {
return pdClient.scanRegions(backOffer, startKey, endKey, limit);
} catch (Exception e) {
return new ArrayList<>();
} finally {
requestTimer.observeDuration();
slowLogSpan.end();
}
}

public TiRegion getRegionByKey(ByteString key) {
return getRegionByKey(key, defaultBackOff());
}
Expand Down
30 changes: 30 additions & 0 deletions src/test/java/org/tikv/common/TiSessionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,14 @@
import java.util.concurrent.atomic.AtomicReference;
import org.junit.After;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.BaseRawKVTest;
import org.tikv.common.region.TiRegion;
import org.tikv.raw.RawKVClient;

public class TiSessionTest extends BaseRawKVTest {
private static final Logger logger = LoggerFactory.getLogger(TiSessionTest.class);
private TiSession session;

@After
Expand Down Expand Up @@ -122,4 +125,31 @@ private void doCloseTest(boolean now, long timeoutMS) throws Exception {
assertTrue(e.getMessage().contains("rejected from java.util.concurrent.ThreadPoolExecutor"));
}
}

@Test
public void warmUpTest() throws Exception {
TiConfiguration conf = createTiConfiguration();
conf.setWarmUpEnable(true);
long t0 = doTest(conf);
conf.setWarmUpEnable(false);
long t1 = doTest(conf);
assertTrue(t0 < t1);
}

private long doTest(TiConfiguration conf) throws Exception {
session = TiSession.create(conf);
long start = System.currentTimeMillis();
try (RawKVClient client = session.createRawClient()) {
client.get(ByteString.EMPTY);
}
long end = System.currentTimeMillis();
logger.info(
"[warm up "
+ (conf.isWarmUpEnable() ? "enabled" : "disabled")
+ "] duration "
+ (end - start)
+ "ms");
session.close();
return end - start;
}
}