Skip to content
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,11 @@
<artifactId>grpc-stub</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-services</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-testing</artifactId>
Expand Down
6 changes: 5 additions & 1 deletion src/main/java/org/tikv/cdc/CDCClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,11 @@ private synchronized void addRegions(final Iterable<TiRegion> regions, final lon
for (final TiRegion region : regions) {
if (overlapWithRegion(region)) {
final String address =
session.getRegionManager().getStoreById(region.getLeader().getStoreId()).getAddress();
session
.getRegionManager()
.getStoreById(region.getLeader().getStoreId())
.getStore()
.getAddress();
final ManagedChannel channel =
session.getChannelFactory().getChannel(address, session.getPDClient().getHostMapping());
try {
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/org/tikv/common/ConfigUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,13 @@ public class ConfigUtils {
public static final String TIKV_METRICS_PORT = "tikv.metrics.port";

public static final String TIKV_NETWORK_MAPPING_NAME = "tikv.network.mapping";
public static final String TIKV_ENABLE_GRPC_FORWARD = "tikv.enable_grpc_forward";
public static final String TIKV_GRPC_HEALTH_CHECK_TIMEOUT = "tikv.grpc.health_check_timeout";

public static final String DEF_PD_ADDRESSES = "127.0.0.1:2379";
public static final String DEF_TIMEOUT = "600ms";
public static final String DEF_SCAN_TIMEOUT = "20s";
public static final int DEF_CHECK_HEALTH_TIMEOUT = 40;
public static final int DEF_SCAN_BATCH_SIZE = 10240;
public static final int DEF_MAX_FRAME_SIZE = 268435456 * 2; // 256 * 2 MB
public static final int DEF_INDEX_SCAN_BATCH_SIZE = 20000;
Expand All @@ -76,6 +79,7 @@ public class ConfigUtils {
public static final boolean DEF_METRICS_ENABLE = false;
public static final int DEF_METRICS_PORT = 3140;
public static final String DEF_TIKV_NETWORK_MAPPING_NAME = "";
public static final boolean DEF_GRPC_FORWARD_ENABLE = true;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why enable by default?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not?


public static final String NORMAL_COMMAND_PRIORITY = "NORMAL";
public static final String LOW_COMMAND_PRIORITY = "LOW";
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/org/tikv/common/PDClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ public TiRegion getRegionByKey(BackOffer backOffer, ByteString key) {
return new TiRegion(
resp.getRegion(),
resp.getLeader(),
null,
conf.getIsolationLevel(),
conf.getCommandPriority(),
conf.getKvMode(),
Expand All @@ -258,6 +259,7 @@ public Future<TiRegion> getRegionByKeyAsync(BackOffer backOffer, ByteString key)
new TiRegion(
resp.getRegion(),
resp.getLeader(),
null,
conf.getIsolationLevel(),
conf.getCommandPriority(),
conf.getKvMode(),
Expand Down Expand Up @@ -285,6 +287,7 @@ public TiRegion getRegionByID(BackOffer backOffer, long id) {
return new TiRegion(
resp.getRegion(),
resp.getLeader(),
null,
conf.getIsolationLevel(),
conf.getCommandPriority(),
conf.getKvMode(),
Expand All @@ -299,6 +302,7 @@ public Future<TiRegion> getRegionByIDAsync(BackOffer backOffer, long id) {
new TiRegion(
resp.getRegion(),
resp.getLeader(),
null,
conf.getIsolationLevel(),
conf.getCommandPriority(),
conf.getKvMode(),
Expand Down
15 changes: 15 additions & 0 deletions src/main/java/org/tikv/common/TiConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import static org.tikv.common.ConfigUtils.*;

import io.grpc.Metadata;
import java.io.Serializable;
import java.net.URI;
import java.util.*;
Expand All @@ -32,6 +33,8 @@ public class TiConfiguration implements Serializable {

private static final Logger logger = LoggerFactory.getLogger(TiConfiguration.class);
private static final ConcurrentHashMap<String, String> settings = new ConcurrentHashMap<>();
public static final Metadata.Key FORWARD_META_DATA_KEY =

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public static final Metadata.Key FORWARD_META_DATA_KEY =
public static final Metadata.Key FORWARD_METADATA_KEY =

Metadata.Key.of("tikv-forwarded-host", Metadata.ASCII_STRING_MARSHALLER);

static {
loadFromSystemProperties();
Expand Down Expand Up @@ -72,6 +75,8 @@ private static void loadFromDefaultProperties() {
setIfMissing(TIKV_METRICS_ENABLE, DEF_METRICS_ENABLE);
setIfMissing(TIKV_METRICS_PORT, DEF_METRICS_PORT);
setIfMissing(TIKV_NETWORK_MAPPING_NAME, DEF_TIKV_NETWORK_MAPPING_NAME);
setIfMissing(TIKV_ENABLE_GRPC_FORWARD, DEF_GRPC_FORWARD_ENABLE);
setIfMissing(TIKV_GRPC_HEALTH_CHECK_TIMEOUT, DEF_CHECK_HEALTH_TIMEOUT);
}

public static void listAll() {
Expand Down Expand Up @@ -245,6 +250,7 @@ private static ReplicaRead getReplicaRead(String key) {
private boolean showRowId = getBoolean(TIKV_SHOW_ROWID);
private String dbPrefix = get(TIKV_DB_PREFIX);
private KVMode kvMode = getKvMode(TIKV_KV_MODE);
private boolean enableGrpcForward = getBoolean(TIKV_ENABLE_GRPC_FORWARD);

private int kvClientConcurrency = getInt(TIKV_KV_CLIENT_CONCURRENCY);
private ReplicaRead replicaRead = getReplicaRead(TIKV_REPLICA_READ);
Expand All @@ -253,6 +259,7 @@ private static ReplicaRead getReplicaRead(String key) {

private boolean metricsEnable = getBoolean(TIKV_METRICS_ENABLE);
private int metricsPort = getInt(TIKV_METRICS_PORT);
private int grpcHealthCheckTimeout = getInt(TIKV_GRPC_HEALTH_CHECK_TIMEOUT);

private final String networkMappingName = get(TIKV_NETWORK_MAPPING_NAME);

Expand Down Expand Up @@ -532,4 +539,12 @@ public TiConfiguration setMetricsPort(int metricsPort) {
public String getNetworkMappingName() {
return this.networkMappingName;
}

public boolean getEnableGrpcForward() {
return this.enableGrpcForward;
}

public long getGrpcHealthCheckTimeout() {
return this.grpcHealthCheckTimeout;
}
}
15 changes: 11 additions & 4 deletions src/main/java/org/tikv/common/TiSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@
import org.tikv.common.region.RegionStoreClient;
import org.tikv.common.region.RegionStoreClient.RegionStoreClientBuilder;
import org.tikv.common.region.TiRegion;
import org.tikv.common.region.TiStore;
import org.tikv.common.util.*;
import org.tikv.kvproto.Metapb;
import org.tikv.raw.RawKVClient;
import org.tikv.txn.KVClient;
import org.tikv.txn.TxnKVClient;
Expand All @@ -71,6 +71,7 @@ public class TiSession implements AutoCloseable {
private volatile ExecutorService batchScanThreadPool;
private volatile ExecutorService deleteRangeThreadPool;
private volatile RegionManager regionManager;
private volatile boolean enableGrpcForward;
private volatile RegionStoreClient.RegionStoreClientBuilder clientBuilder;
private boolean isClosed = false;
private HTTPServer server;
Expand All @@ -80,6 +81,7 @@ public TiSession(TiConfiguration conf) {
this.conf = conf;
this.channelFactory = new ChannelFactory(conf.getMaxFrameSize());
this.client = PDClient.createRaw(conf, channelFactory);
this.enableGrpcForward = conf.getEnableGrpcForward();
if (conf.isMetricsEnable()) {
try {
this.collectorRegistry = new CollectorRegistry();
Expand Down Expand Up @@ -199,7 +201,12 @@ public synchronized RegionManager getRegionManager() {
if (res == null) {
synchronized (this) {
if (regionManager == null) {
regionManager = new RegionManager(getPDClient(), this.cacheInvalidateCallback);
regionManager =
new RegionManager(
getPDClient(),
this.cacheInvalidateCallback,
this.channelFactory,
this.enableGrpcForward);
}
res = regionManager;
}
Expand Down Expand Up @@ -415,10 +422,10 @@ private List<TiRegion> splitRegion(List<ByteString> splitKeys, BackOffer backOff
groupKeysByRegion(regionManager, splitKeys, backOffer);
for (Map.Entry<TiRegion, List<ByteString>> entry : groupKeys.entrySet()) {

Pair<TiRegion, Metapb.Store> pair =
Pair<TiRegion, TiStore> pair =
getRegionManager().getRegionStorePairByKey(entry.getKey().getStartKey());
TiRegion region = pair.first;
Metapb.Store store = pair.second;
TiStore store = pair.second;
List<ByteString> splits =
entry
.getValue()
Expand Down
11 changes: 6 additions & 5 deletions src/main/java/org/tikv/common/operation/RegionErrorHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,7 @@ public boolean handleRegionError(BackOffer backOffer, Errorpb.Error error) {
// onNotLeader is only needed when updateLeader succeeds, thus switch
// to a new store address.
TiRegion newRegion = this.regionManager.updateLeader(recv.getRegion(), newStoreId);
retry =
newRegion != null
&& recv.onNotLeader(this.regionManager.getStoreById(newStoreId), newRegion);
retry = newRegion != null && recv.onNotLeader(newRegion);

backOffFuncType = BackOffFunction.BackOffFuncType.BoUpdateLeader;
} else {
Expand Down Expand Up @@ -107,7 +105,6 @@ public boolean handleRegionError(BackOffer backOffer, Errorpb.Error error) {

this.regionManager.invalidateRegion(recv.getRegion());
this.regionManager.invalidateStore(storeId);
// recv.onStoreNotMatch(this.regionManager.getStoreById(storeId));
// assume this is a low probability error, do not retry, just re-split the request by
// throwing it out.
return false;
Expand Down Expand Up @@ -169,7 +166,11 @@ public boolean handleRegionError(BackOffer backOffer, Errorpb.Error error) {

@Override
public boolean handleRequestError(BackOffer backOffer, Exception e) {
regionManager.onRequestFail(recv.getRegion());
if (recv.onStoreUnreachable()) {
return true;
} else {
regionManager.onRequestFail(recv.getRegion());
}

backOffer.doBackOff(
BackOffFunction.BackOffFuncType.BoTiKVRPC,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@
import org.tikv.common.region.RegionStoreClient;
import org.tikv.common.region.RegionStoreClient.RegionStoreClientBuilder;
import org.tikv.common.region.TiRegion;
import org.tikv.common.region.TiStore;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ConcreteBackOffer;
import org.tikv.common.util.Pair;
import org.tikv.kvproto.Kvrpcpb;
import org.tikv.kvproto.Metapb;

public class ConcreteScanIterator extends ScanIterator {
private final long version;
Expand Down Expand Up @@ -82,10 +82,10 @@ TiRegion loadCurrentRegionToCache() throws GrpcException {

private ByteString resolveCurrentLock(Kvrpcpb.KvPair current) {
logger.warn(String.format("resolve current key error %s", current.getError().toString()));
Pair<TiRegion, Metapb.Store> pair =
Pair<TiRegion, TiStore> pair =
builder.getRegionManager().getRegionStorePairByKey(current.getKey());
TiRegion region = pair.first;
Metapb.Store store = pair.second;
TiStore store = pair.second;
BackOffer backOffer = ConcreteBackOffer.newGetBackOff();
try (RegionStoreClient client = builder.build(region, store)) {
return client.get(backOffer, current.getKey(), version);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@
import org.tikv.common.operation.SchemaInfer;
import org.tikv.common.region.RegionStoreClient;
import org.tikv.common.region.TiRegion;
import org.tikv.common.region.TiStore;
import org.tikv.common.region.TiStoreType;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ConcreteBackOffer;
import org.tikv.common.util.RangeSplitter;
import org.tikv.kvproto.Coprocessor;
import org.tikv.kvproto.Metapb;

public abstract class DAGIterator<T>
extends org.tikv.common.operation.iterator.CoprocessorIterator<T> {
Expand Down Expand Up @@ -204,7 +204,7 @@ private SelectResponse process(RangeSplitter.RegionTask regionTask) {
}
List<Coprocessor.KeyRange> ranges = task.getRanges();
TiRegion region = task.getRegion();
Metapb.Store store = task.getStore();
TiStore store = task.getStore();

try {
RegionStoreClient client =
Expand Down Expand Up @@ -245,7 +245,7 @@ private SelectResponse process(RangeSplitter.RegionTask regionTask) {
private Iterator<SelectResponse> processByStreaming(RangeSplitter.RegionTask regionTask) {
List<Coprocessor.KeyRange> ranges = regionTask.getRanges();
TiRegion region = regionTask.getRegion();
Metapb.Store store = regionTask.getStore();
TiStore store = regionTask.getStore();

RegionStoreClient client;
try {
Expand Down
Loading