Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,11 @@
<artifactId>grpc-stub</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-services</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-testing</artifactId>
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/org/tikv/common/ConfigUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,13 @@ public class ConfigUtils {
public static final String TIKV_METRICS_PORT = "tikv.metrics.port";

public static final String TIKV_NETWORK_MAPPING_NAME = "tikv.network.mapping";
public static final String TIKV_ENABLE_GRPC_FORWARD = "tikv.enable_grpc_forward";
public static final String TIKV_GRPC_HEALTH_CHECK_TIMEOUT = "tikv.grpc.health_check_timeout";

public static final String DEF_PD_ADDRESSES = "127.0.0.1:2379";
public static final String DEF_TIMEOUT = "600ms";
public static final String DEF_SCAN_TIMEOUT = "20s";
public static final int DEF_CHECK_HEALTH_TIMEOUT = 40;
public static final int DEF_SCAN_BATCH_SIZE = 10240;
public static final int DEF_MAX_FRAME_SIZE = 268435456 * 2; // 256 * 2 MB
public static final int DEF_INDEX_SCAN_BATCH_SIZE = 20000;
Expand All @@ -76,6 +79,7 @@ public class ConfigUtils {
public static final boolean DEF_METRICS_ENABLE = false;
public static final int DEF_METRICS_PORT = 3140;
public static final String DEF_TIKV_NETWORK_MAPPING_NAME = "";
public static final boolean DEF_GRPC_FORWARD_ENABLE = true;

public static final String NORMAL_COMMAND_PRIORITY = "NORMAL";
public static final String LOW_COMMAND_PRIORITY = "LOW";
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/org/tikv/common/PDClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ public TiRegion getRegionByKey(BackOffer backOffer, ByteString key) {
return new TiRegion(
resp.getRegion(),
resp.getLeader(),
null,
conf.getIsolationLevel(),
conf.getCommandPriority(),
conf.getKvMode(),
Expand All @@ -258,6 +259,7 @@ public Future<TiRegion> getRegionByKeyAsync(BackOffer backOffer, ByteString key)
new TiRegion(
resp.getRegion(),
resp.getLeader(),
null,
conf.getIsolationLevel(),
conf.getCommandPriority(),
conf.getKvMode(),
Expand Down Expand Up @@ -285,6 +287,7 @@ public TiRegion getRegionByID(BackOffer backOffer, long id) {
return new TiRegion(
resp.getRegion(),
resp.getLeader(),
null,
conf.getIsolationLevel(),
conf.getCommandPriority(),
conf.getKvMode(),
Expand All @@ -299,6 +302,7 @@ public Future<TiRegion> getRegionByIDAsync(BackOffer backOffer, long id) {
new TiRegion(
resp.getRegion(),
resp.getLeader(),
null,
conf.getIsolationLevel(),
conf.getCommandPriority(),
conf.getKvMode(),
Expand Down
15 changes: 15 additions & 0 deletions src/main/java/org/tikv/common/TiConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import static org.tikv.common.ConfigUtils.*;

import io.grpc.Metadata;
import java.io.Serializable;
import java.net.URI;
import java.util.*;
Expand All @@ -32,6 +33,8 @@ public class TiConfiguration implements Serializable {

private static final Logger logger = LoggerFactory.getLogger(TiConfiguration.class);
private static final ConcurrentHashMap<String, String> settings = new ConcurrentHashMap<>();
public static final Metadata.Key FORWARD_META_DATA_KEY =
Metadata.Key.of("tikv-forwarded-host", Metadata.ASCII_STRING_MARSHALLER);

static {
loadFromSystemProperties();
Expand Down Expand Up @@ -72,6 +75,8 @@ private static void loadFromDefaultProperties() {
setIfMissing(TIKV_METRICS_ENABLE, DEF_METRICS_ENABLE);
setIfMissing(TIKV_METRICS_PORT, DEF_METRICS_PORT);
setIfMissing(TIKV_NETWORK_MAPPING_NAME, DEF_TIKV_NETWORK_MAPPING_NAME);
setIfMissing(TIKV_ENABLE_GRPC_FORWARD, DEF_GRPC_FORWARD_ENABLE);
setIfMissing(TIKV_GRPC_HEALTH_CHECK_TIMEOUT, DEF_CHECK_HEALTH_TIMEOUT);
}

public static void listAll() {
Expand Down Expand Up @@ -245,6 +250,7 @@ private static ReplicaRead getReplicaRead(String key) {
private boolean showRowId = getBoolean(TIKV_SHOW_ROWID);
private String dbPrefix = get(TIKV_DB_PREFIX);
private KVMode kvMode = getKvMode(TIKV_KV_MODE);
private boolean enableGrpcForward = getBoolean(TIKV_ENABLE_GRPC_FORWARD);

private int kvClientConcurrency = getInt(TIKV_KV_CLIENT_CONCURRENCY);
private ReplicaRead replicaRead = getReplicaRead(TIKV_REPLICA_READ);
Expand All @@ -253,6 +259,7 @@ private static ReplicaRead getReplicaRead(String key) {

private boolean metricsEnable = getBoolean(TIKV_METRICS_ENABLE);
private int metricsPort = getInt(TIKV_METRICS_PORT);
private int grpcHealthCheckTimeout = getInt(TIKV_GRPC_HEALTH_CHECK_TIMEOUT);

private final String networkMappingName = get(TIKV_NETWORK_MAPPING_NAME);

Expand Down Expand Up @@ -532,4 +539,12 @@ public TiConfiguration setMetricsPort(int metricsPort) {
public String getNetworkMappingName() {
return this.networkMappingName;
}

public boolean getEnableGrpcForward() {
return this.enableGrpcForward;
}

public long getGrpcHealthCheckTimeout() {
return this.grpcHealthCheckTimeout;
}
}
15 changes: 11 additions & 4 deletions src/main/java/org/tikv/common/TiSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@
import org.tikv.common.region.RegionStoreClient;
import org.tikv.common.region.RegionStoreClient.RegionStoreClientBuilder;
import org.tikv.common.region.TiRegion;
import org.tikv.common.region.TiStore;
import org.tikv.common.util.*;
import org.tikv.kvproto.Metapb;
import org.tikv.raw.RawKVClient;
import org.tikv.txn.KVClient;
import org.tikv.txn.TxnKVClient;
Expand All @@ -71,6 +71,7 @@ public class TiSession implements AutoCloseable {
private volatile ExecutorService batchScanThreadPool;
private volatile ExecutorService deleteRangeThreadPool;
private volatile RegionManager regionManager;
private volatile boolean enableGrpcForward;
private volatile RegionStoreClient.RegionStoreClientBuilder clientBuilder;
private boolean isClosed = false;
private HTTPServer server;
Expand All @@ -80,6 +81,7 @@ public TiSession(TiConfiguration conf) {
this.conf = conf;
this.channelFactory = new ChannelFactory(conf.getMaxFrameSize());
this.client = PDClient.createRaw(conf, channelFactory);
this.enableGrpcForward = conf.getEnableGrpcForward();
if (conf.isMetricsEnable()) {
try {
this.collectorRegistry = new CollectorRegistry();
Expand Down Expand Up @@ -199,7 +201,12 @@ public synchronized RegionManager getRegionManager() {
if (res == null) {
synchronized (this) {
if (regionManager == null) {
regionManager = new RegionManager(getPDClient(), this.cacheInvalidateCallback);
regionManager =
new RegionManager(
getPDClient(),
this.cacheInvalidateCallback,
this.channelFactory,
this.enableGrpcForward);
}
res = regionManager;
}
Expand Down Expand Up @@ -415,10 +422,10 @@ private List<TiRegion> splitRegion(List<ByteString> splitKeys, BackOffer backOff
groupKeysByRegion(regionManager, splitKeys, backOffer);
for (Map.Entry<TiRegion, List<ByteString>> entry : groupKeys.entrySet()) {

Pair<TiRegion, Metapb.Store> pair =
Pair<TiRegion, TiStore> pair =
getRegionManager().getRegionStorePairByKey(entry.getKey().getStartKey());
TiRegion region = pair.first;
Metapb.Store store = pair.second;
TiStore store = pair.second;
List<ByteString> splits =
entry
.getValue()
Expand Down
4 changes: 1 addition & 3 deletions src/main/java/org/tikv/common/operation/KVErrorHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,7 @@ public boolean handleResponseError(BackOffer backOffer, RespT resp) {
// onNotLeader is only needed when updateLeader succeeds, thus switch
// to a new store address.
TiRegion newRegion = this.regionManager.updateLeader(recv.getRegion(), newStoreId);
retry =
newRegion != null
&& recv.onNotLeader(this.regionManager.getStoreById(newStoreId), newRegion);
retry = newRegion != null && recv.onNotLeader(newRegion);

backOffFuncType = BackOffFunction.BackOffFuncType.BoUpdateLeader;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@
import org.tikv.common.region.RegionStoreClient;
import org.tikv.common.region.RegionStoreClient.RegionStoreClientBuilder;
import org.tikv.common.region.TiRegion;
import org.tikv.common.region.TiStore;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ConcreteBackOffer;
import org.tikv.common.util.Pair;
import org.tikv.kvproto.Kvrpcpb;
import org.tikv.kvproto.Metapb;

public class ConcreteScanIterator extends ScanIterator {
private final long version;
Expand Down Expand Up @@ -82,10 +82,10 @@ TiRegion loadCurrentRegionToCache() throws GrpcException {

private ByteString resolveCurrentLock(Kvrpcpb.KvPair current) {
logger.warn(String.format("resolve current key error %s", current.getError().toString()));
Pair<TiRegion, Metapb.Store> pair =
Pair<TiRegion, TiStore> pair =
builder.getRegionManager().getRegionStorePairByKey(current.getKey());
TiRegion region = pair.first;
Metapb.Store store = pair.second;
TiStore store = pair.second;
BackOffer backOffer = ConcreteBackOffer.newGetBackOff();
try (RegionStoreClient client = builder.build(region, store)) {
return client.get(backOffer, current.getKey(), version);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@
import org.tikv.common.operation.SchemaInfer;
import org.tikv.common.region.RegionStoreClient;
import org.tikv.common.region.TiRegion;
import org.tikv.common.region.TiStore;
import org.tikv.common.region.TiStoreType;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ConcreteBackOffer;
import org.tikv.common.util.RangeSplitter;
import org.tikv.kvproto.Coprocessor;
import org.tikv.kvproto.Metapb;

public abstract class DAGIterator<T>
extends org.tikv.common.operation.iterator.CoprocessorIterator<T> {
Expand Down Expand Up @@ -204,7 +204,7 @@ private SelectResponse process(RangeSplitter.RegionTask regionTask) {
}
List<Coprocessor.KeyRange> ranges = task.getRanges();
TiRegion region = task.getRegion();
Metapb.Store store = task.getStore();
TiStore store = task.getStore();

try {
RegionStoreClient client =
Expand Down Expand Up @@ -246,7 +246,7 @@ private SelectResponse process(RangeSplitter.RegionTask regionTask) {
private Iterator<SelectResponse> processByStreaming(RangeSplitter.RegionTask regionTask) {
List<Coprocessor.KeyRange> ranges = regionTask.getRanges();
TiRegion region = regionTask.getRegion();
Metapb.Store store = regionTask.getStore();
TiStore store = regionTask.getStore();

RegionStoreClient client;
try {
Expand Down
Loading