diff --git a/pom.xml b/pom.xml
index 4e4cff5be70..c67ed6b7585 100644
--- a/pom.xml
+++ b/pom.xml
@@ -126,6 +126,11 @@
grpc-stub
${grpc.version}
+
+ io.grpc
+ grpc-services
+ ${grpc.version}
+
io.grpc
grpc-testing
diff --git a/src/main/java/org/tikv/cdc/CDCClient.java b/src/main/java/org/tikv/cdc/CDCClient.java
index 1617773d505..c6ee84a3526 100644
--- a/src/main/java/org/tikv/cdc/CDCClient.java
+++ b/src/main/java/org/tikv/cdc/CDCClient.java
@@ -149,7 +149,11 @@ private synchronized void addRegions(final Iterable regions, final lon
for (final TiRegion region : regions) {
if (overlapWithRegion(region)) {
final String address =
- session.getRegionManager().getStoreById(region.getLeader().getStoreId()).getAddress();
+ session
+ .getRegionManager()
+ .getStoreById(region.getLeader().getStoreId())
+ .getStore()
+ .getAddress();
final ManagedChannel channel =
session.getChannelFactory().getChannel(address, session.getPDClient().getHostMapping());
try {
diff --git a/src/main/java/org/tikv/common/ConfigUtils.java b/src/main/java/org/tikv/common/ConfigUtils.java
index 6df036d4e05..f4c06598ba2 100644
--- a/src/main/java/org/tikv/common/ConfigUtils.java
+++ b/src/main/java/org/tikv/common/ConfigUtils.java
@@ -48,10 +48,13 @@ public class ConfigUtils {
public static final String TIKV_METRICS_PORT = "tikv.metrics.port";
public static final String TIKV_NETWORK_MAPPING_NAME = "tikv.network.mapping";
+ public static final String TIKV_ENABLE_GRPC_FORWARD = "tikv.enable_grpc_forward";
+ public static final String TIKV_GRPC_HEALTH_CHECK_TIMEOUT = "tikv.grpc.health_check_timeout";
public static final String DEF_PD_ADDRESSES = "127.0.0.1:2379";
public static final String DEF_TIMEOUT = "600ms";
public static final String DEF_SCAN_TIMEOUT = "20s";
+ public static final int DEF_CHECK_HEALTH_TIMEOUT = 40;
public static final int DEF_SCAN_BATCH_SIZE = 10240;
public static final int DEF_MAX_FRAME_SIZE = 268435456 * 2; // 256 * 2 MB
public static final int DEF_INDEX_SCAN_BATCH_SIZE = 20000;
@@ -76,6 +79,7 @@ public class ConfigUtils {
public static final boolean DEF_METRICS_ENABLE = false;
public static final int DEF_METRICS_PORT = 3140;
public static final String DEF_TIKV_NETWORK_MAPPING_NAME = "";
+ public static final boolean DEF_GRPC_FORWARD_ENABLE = true;
public static final String NORMAL_COMMAND_PRIORITY = "NORMAL";
public static final String LOW_COMMAND_PRIORITY = "LOW";
diff --git a/src/main/java/org/tikv/common/PDClient.java b/src/main/java/org/tikv/common/PDClient.java
index ac4d4f39182..20693547d39 100644
--- a/src/main/java/org/tikv/common/PDClient.java
+++ b/src/main/java/org/tikv/common/PDClient.java
@@ -241,6 +241,7 @@ public TiRegion getRegionByKey(BackOffer backOffer, ByteString key) {
return new TiRegion(
resp.getRegion(),
resp.getLeader(),
+ null,
conf.getIsolationLevel(),
conf.getCommandPriority(),
conf.getKvMode(),
@@ -258,6 +259,7 @@ public Future getRegionByKeyAsync(BackOffer backOffer, ByteString key)
new TiRegion(
resp.getRegion(),
resp.getLeader(),
+ null,
conf.getIsolationLevel(),
conf.getCommandPriority(),
conf.getKvMode(),
@@ -285,6 +287,7 @@ public TiRegion getRegionByID(BackOffer backOffer, long id) {
return new TiRegion(
resp.getRegion(),
resp.getLeader(),
+ null,
conf.getIsolationLevel(),
conf.getCommandPriority(),
conf.getKvMode(),
@@ -299,6 +302,7 @@ public Future getRegionByIDAsync(BackOffer backOffer, long id) {
new TiRegion(
resp.getRegion(),
resp.getLeader(),
+ null,
conf.getIsolationLevel(),
conf.getCommandPriority(),
conf.getKvMode(),
diff --git a/src/main/java/org/tikv/common/TiConfiguration.java b/src/main/java/org/tikv/common/TiConfiguration.java
index 0e97028d06f..c38fd44bbc7 100644
--- a/src/main/java/org/tikv/common/TiConfiguration.java
+++ b/src/main/java/org/tikv/common/TiConfiguration.java
@@ -17,6 +17,7 @@
import static org.tikv.common.ConfigUtils.*;
+import io.grpc.Metadata;
import java.io.Serializable;
import java.net.URI;
import java.util.*;
@@ -32,6 +33,8 @@ public class TiConfiguration implements Serializable {
private static final Logger logger = LoggerFactory.getLogger(TiConfiguration.class);
private static final ConcurrentHashMap settings = new ConcurrentHashMap<>();
+ public static final Metadata.Key FORWARD_META_DATA_KEY =
+ Metadata.Key.of("tikv-forwarded-host", Metadata.ASCII_STRING_MARSHALLER);
static {
loadFromSystemProperties();
@@ -72,6 +75,8 @@ private static void loadFromDefaultProperties() {
setIfMissing(TIKV_METRICS_ENABLE, DEF_METRICS_ENABLE);
setIfMissing(TIKV_METRICS_PORT, DEF_METRICS_PORT);
setIfMissing(TIKV_NETWORK_MAPPING_NAME, DEF_TIKV_NETWORK_MAPPING_NAME);
+ setIfMissing(TIKV_ENABLE_GRPC_FORWARD, DEF_GRPC_FORWARD_ENABLE);
+ setIfMissing(TIKV_GRPC_HEALTH_CHECK_TIMEOUT, DEF_CHECK_HEALTH_TIMEOUT);
}
public static void listAll() {
@@ -245,6 +250,7 @@ private static ReplicaRead getReplicaRead(String key) {
private boolean showRowId = getBoolean(TIKV_SHOW_ROWID);
private String dbPrefix = get(TIKV_DB_PREFIX);
private KVMode kvMode = getKvMode(TIKV_KV_MODE);
+ private boolean enableGrpcForward = getBoolean(TIKV_ENABLE_GRPC_FORWARD);
private int kvClientConcurrency = getInt(TIKV_KV_CLIENT_CONCURRENCY);
private ReplicaRead replicaRead = getReplicaRead(TIKV_REPLICA_READ);
@@ -253,6 +259,7 @@ private static ReplicaRead getReplicaRead(String key) {
private boolean metricsEnable = getBoolean(TIKV_METRICS_ENABLE);
private int metricsPort = getInt(TIKV_METRICS_PORT);
+ private int grpcHealthCheckTimeout = getInt(TIKV_GRPC_HEALTH_CHECK_TIMEOUT);
private final String networkMappingName = get(TIKV_NETWORK_MAPPING_NAME);
@@ -532,4 +539,12 @@ public TiConfiguration setMetricsPort(int metricsPort) {
public String getNetworkMappingName() {
return this.networkMappingName;
}
+
+ public boolean getEnableGrpcForward() {
+ return this.enableGrpcForward;
+ }
+
+ public long getGrpcHealthCheckTimeout() {
+ return this.grpcHealthCheckTimeout;
+ }
}
diff --git a/src/main/java/org/tikv/common/TiSession.java b/src/main/java/org/tikv/common/TiSession.java
index 41410d64d3b..749db0dc6c8 100644
--- a/src/main/java/org/tikv/common/TiSession.java
+++ b/src/main/java/org/tikv/common/TiSession.java
@@ -43,8 +43,8 @@
import org.tikv.common.region.RegionStoreClient;
import org.tikv.common.region.RegionStoreClient.RegionStoreClientBuilder;
import org.tikv.common.region.TiRegion;
+import org.tikv.common.region.TiStore;
import org.tikv.common.util.*;
-import org.tikv.kvproto.Metapb;
import org.tikv.raw.RawKVClient;
import org.tikv.txn.KVClient;
import org.tikv.txn.TxnKVClient;
@@ -71,6 +71,7 @@ public class TiSession implements AutoCloseable {
private volatile ExecutorService batchScanThreadPool;
private volatile ExecutorService deleteRangeThreadPool;
private volatile RegionManager regionManager;
+ private volatile boolean enableGrpcForward;
private volatile RegionStoreClient.RegionStoreClientBuilder clientBuilder;
private boolean isClosed = false;
private HTTPServer server;
@@ -80,6 +81,7 @@ public TiSession(TiConfiguration conf) {
this.conf = conf;
this.channelFactory = new ChannelFactory(conf.getMaxFrameSize());
this.client = PDClient.createRaw(conf, channelFactory);
+ this.enableGrpcForward = conf.getEnableGrpcForward();
if (conf.isMetricsEnable()) {
try {
this.collectorRegistry = new CollectorRegistry();
@@ -199,7 +201,12 @@ public synchronized RegionManager getRegionManager() {
if (res == null) {
synchronized (this) {
if (regionManager == null) {
- regionManager = new RegionManager(getPDClient(), this.cacheInvalidateCallback);
+ regionManager =
+ new RegionManager(
+ getPDClient(),
+ this.cacheInvalidateCallback,
+ this.channelFactory,
+ this.enableGrpcForward);
}
res = regionManager;
}
@@ -415,10 +422,10 @@ private List splitRegion(List splitKeys, BackOffer backOff
groupKeysByRegion(regionManager, splitKeys, backOffer);
for (Map.Entry> entry : groupKeys.entrySet()) {
- Pair pair =
+ Pair pair =
getRegionManager().getRegionStorePairByKey(entry.getKey().getStartKey());
TiRegion region = pair.first;
- Metapb.Store store = pair.second;
+ TiStore store = pair.second;
List splits =
entry
.getValue()
diff --git a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java
index 4137854e752..c783c4f3b30 100644
--- a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java
+++ b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java
@@ -73,9 +73,7 @@ public boolean handleRegionError(BackOffer backOffer, Errorpb.Error error) {
// onNotLeader is only needed when updateLeader succeeds, thus switch
// to a new store address.
TiRegion newRegion = this.regionManager.updateLeader(recv.getRegion(), newStoreId);
- retry =
- newRegion != null
- && recv.onNotLeader(this.regionManager.getStoreById(newStoreId), newRegion);
+ retry = newRegion != null && recv.onNotLeader(newRegion);
backOffFuncType = BackOffFunction.BackOffFuncType.BoUpdateLeader;
} else {
@@ -107,7 +105,6 @@ public boolean handleRegionError(BackOffer backOffer, Errorpb.Error error) {
this.regionManager.invalidateRegion(recv.getRegion());
this.regionManager.invalidateStore(storeId);
- // recv.onStoreNotMatch(this.regionManager.getStoreById(storeId));
// assume this is a low probability error, do not retry, just re-split the request by
// throwing it out.
return false;
@@ -169,7 +166,11 @@ public boolean handleRegionError(BackOffer backOffer, Errorpb.Error error) {
@Override
public boolean handleRequestError(BackOffer backOffer, Exception e) {
- regionManager.onRequestFail(recv.getRegion());
+ if (recv.onStoreUnreachable()) {
+ return true;
+ } else {
+ regionManager.onRequestFail(recv.getRegion());
+ }
backOffer.doBackOff(
BackOffFunction.BackOffFuncType.BoTiKVRPC,
diff --git a/src/main/java/org/tikv/common/operation/iterator/ConcreteScanIterator.java b/src/main/java/org/tikv/common/operation/iterator/ConcreteScanIterator.java
index e563296ab51..1f5ac6fcf36 100644
--- a/src/main/java/org/tikv/common/operation/iterator/ConcreteScanIterator.java
+++ b/src/main/java/org/tikv/common/operation/iterator/ConcreteScanIterator.java
@@ -27,11 +27,11 @@
import org.tikv.common.region.RegionStoreClient;
import org.tikv.common.region.RegionStoreClient.RegionStoreClientBuilder;
import org.tikv.common.region.TiRegion;
+import org.tikv.common.region.TiStore;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ConcreteBackOffer;
import org.tikv.common.util.Pair;
import org.tikv.kvproto.Kvrpcpb;
-import org.tikv.kvproto.Metapb;
public class ConcreteScanIterator extends ScanIterator {
private final long version;
@@ -82,10 +82,10 @@ TiRegion loadCurrentRegionToCache() throws GrpcException {
private ByteString resolveCurrentLock(Kvrpcpb.KvPair current) {
logger.warn(String.format("resolve current key error %s", current.getError().toString()));
- Pair pair =
+ Pair pair =
builder.getRegionManager().getRegionStorePairByKey(current.getKey());
TiRegion region = pair.first;
- Metapb.Store store = pair.second;
+ TiStore store = pair.second;
BackOffer backOffer = ConcreteBackOffer.newGetBackOff();
try (RegionStoreClient client = builder.build(region, store)) {
return client.get(backOffer, current.getKey(), version);
diff --git a/src/main/java/org/tikv/common/operation/iterator/DAGIterator.java b/src/main/java/org/tikv/common/operation/iterator/DAGIterator.java
index f9dc40536a9..36b9fbc14e5 100644
--- a/src/main/java/org/tikv/common/operation/iterator/DAGIterator.java
+++ b/src/main/java/org/tikv/common/operation/iterator/DAGIterator.java
@@ -32,12 +32,12 @@
import org.tikv.common.operation.SchemaInfer;
import org.tikv.common.region.RegionStoreClient;
import org.tikv.common.region.TiRegion;
+import org.tikv.common.region.TiStore;
import org.tikv.common.region.TiStoreType;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ConcreteBackOffer;
import org.tikv.common.util.RangeSplitter;
import org.tikv.kvproto.Coprocessor;
-import org.tikv.kvproto.Metapb;
public abstract class DAGIterator
extends org.tikv.common.operation.iterator.CoprocessorIterator {
@@ -204,7 +204,7 @@ private SelectResponse process(RangeSplitter.RegionTask regionTask) {
}
List ranges = task.getRanges();
TiRegion region = task.getRegion();
- Metapb.Store store = task.getStore();
+ TiStore store = task.getStore();
try {
RegionStoreClient client =
@@ -245,7 +245,7 @@ private SelectResponse process(RangeSplitter.RegionTask regionTask) {
private Iterator processByStreaming(RangeSplitter.RegionTask regionTask) {
List ranges = regionTask.getRanges();
TiRegion region = regionTask.getRegion();
- Metapb.Store store = regionTask.getStore();
+ TiStore store = regionTask.getStore();
RegionStoreClient client;
try {
diff --git a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java
index 34942688859..a2d90ce47a6 100644
--- a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java
+++ b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java
@@ -21,6 +21,12 @@
import static com.google.common.base.Preconditions.checkNotNull;
import io.grpc.ManagedChannel;
+import io.grpc.Metadata;
+import io.grpc.health.v1.HealthCheckRequest;
+import io.grpc.health.v1.HealthCheckResponse;
+import io.grpc.health.v1.HealthGrpc;
+import io.grpc.stub.MetadataUtils;
+import java.util.List;
import java.util.concurrent.TimeUnit;
import org.tikv.common.AbstractGRPCClient;
import org.tikv.common.TiConfiguration;
@@ -35,10 +41,12 @@ public abstract class AbstractRegionStoreClient
protected final RegionManager regionManager;
protected TiRegion region;
+ protected TiStore targetStore;
protected AbstractRegionStoreClient(
TiConfiguration conf,
TiRegion region,
+ TiStore store,
ChannelFactory channelFactory,
TikvGrpc.TikvBlockingStub blockingStub,
TikvGrpc.TikvStub asyncStub,
@@ -49,6 +57,7 @@ protected AbstractRegionStoreClient(
checkArgument(region.getLeader() != null, "Leader Peer is null");
this.region = region;
this.regionManager = regionManager;
+ this.targetStore = store;
}
public TiRegion getRegion() {
@@ -71,13 +80,13 @@ public void close() throws GrpcException {}
/**
* onNotLeader deals with NotLeaderError and returns whether re-splitting key range is needed
*
- * @param newStore the new store presented by NotLeader Error
+ * @param newRegion the new region presented by NotLeader Error
* @return false when re-split is needed.
*/
@Override
- public boolean onNotLeader(Metapb.Store newStore, TiRegion newRegion) {
+ public boolean onNotLeader(TiRegion newRegion) {
if (logger.isDebugEnabled()) {
- logger.debug(region + ", new leader = " + newStore.getId());
+ logger.debug(region + ", new leader = " + newRegion.getLeader().getStoreId());
}
// When switch leader fails or the region changed its region epoch,
// it would be necessary to re-split task's key range for new region.
@@ -85,7 +94,8 @@ public boolean onNotLeader(Metapb.Store newStore, TiRegion newRegion) {
return false;
}
region = newRegion;
- String addressStr = regionManager.getStoreById(region.getLeader().getStoreId()).getAddress();
+ targetStore = regionManager.getStoreById(region.getLeader().getStoreId());
+ String addressStr = targetStore.getStore().getAddress();
ManagedChannel channel =
channelFactory.getChannel(addressStr, regionManager.getPDClient().getHostMapping());
blockingStub = TikvGrpc.newBlockingStub(channel);
@@ -94,20 +104,89 @@ public boolean onNotLeader(Metapb.Store newStore, TiRegion newRegion) {
}
@Override
- public void onStoreNotMatch(Metapb.Store store) {
- String addressStr = store.getAddress();
+ public boolean onStoreUnreachable() {
+ if (!conf.getEnableGrpcForward()) {
+ return false;
+ }
+ if (region.getProxyStore() != null) {
+ TiStore store = region.getProxyStore();
+ if (!checkHealth(store) && store.markUnreachable()) {
+ this.regionManager.scheduleHealthCheckJob(store);
+ }
+ } else {
+ if (!targetStore.isUnreachable()) {
+ if (checkHealth(targetStore)) {
+ return true;
+ } else {
+ if (targetStore.markUnreachable()) {
+ this.regionManager.scheduleHealthCheckJob(targetStore);
+ }
+ }
+ }
+ }
+ TiRegion proxyRegion = switchProxyStore();
+ if (proxyRegion == null) {
+ return false;
+ }
+ regionManager.updateRegion(region, proxyRegion);
+ region = proxyRegion;
+ String addressStr = region.getProxyStore().getStore().getAddress();
ManagedChannel channel =
channelFactory.getChannel(addressStr, regionManager.getPDClient().getHostMapping());
- blockingStub = TikvGrpc.newBlockingStub(channel);
- asyncStub = TikvGrpc.newStub(channel);
- if (region.getLeader().getStoreId() != store.getId()) {
- logger.warn(
- "store_not_match may occur? "
- + region
- + ", original store = "
- + store.getId()
- + " address = "
- + addressStr);
+ Metadata header = new Metadata();
+ header.put(TiConfiguration.FORWARD_META_DATA_KEY, targetStore.getStore().getAddress());
+ blockingStub = MetadataUtils.attachHeaders(TikvGrpc.newBlockingStub(channel), header);
+ asyncStub = MetadataUtils.attachHeaders(TikvGrpc.newStub(channel), header);
+ return true;
+ }
+
+ private boolean checkHealth(TiStore store) {
+ if (store.getStore() == null) {
+ return false;
+ }
+ String addressStr = store.getStore().getAddress();
+ ManagedChannel channel =
+ channelFactory.getChannel(addressStr, regionManager.getPDClient().getHostMapping());
+ HealthGrpc.HealthBlockingStub stub =
+ HealthGrpc.newBlockingStub(channel)
+ .withDeadlineAfter(conf.getGrpcHealthCheckTimeout(), TimeUnit.MILLISECONDS);
+ HealthCheckRequest req = HealthCheckRequest.newBuilder().build();
+ try {
+ HealthCheckResponse resp = stub.check(req);
+ if (resp.getStatus() != HealthCheckResponse.ServingStatus.SERVING) {
+ return false;
+ }
+ } catch (Exception e) {
+ return false;
+ }
+ return true;
+ }
+
+ private TiRegion switchProxyStore() {
+ boolean hasVisitedStore = false;
+ List peers = region.getFollowerList();
+ for (int i = 0; i < peers.size() * 2; i++) {
+ int idx = i % peers.size();
+ Metapb.Peer peer = peers.get(idx);
+ if (peer.getStoreId() != region.getLeader().getStoreId()) {
+ if (region.getProxyStore() == null) {
+ TiStore store = regionManager.getStoreById(peer.getStoreId());
+ if (checkHealth(store)) {
+ return region.switchProxyStore(store);
+ }
+ } else {
+ TiStore proxyStore = region.getProxyStore();
+ if (peer.getStoreId() == proxyStore.getStore().getId()) {
+ hasVisitedStore = true;
+ } else if (hasVisitedStore) {
+ proxyStore = regionManager.getStoreById(peer.getStoreId());
+ if (!proxyStore.isUnreachable() && checkHealth(proxyStore)) {
+ return region.switchProxyStore(proxyStore);
+ }
+ }
+ }
+ }
}
+ return null;
}
}
diff --git a/src/main/java/org/tikv/common/region/RegionErrorReceiver.java b/src/main/java/org/tikv/common/region/RegionErrorReceiver.java
index 42c2d6aeba3..4bee1356eab 100644
--- a/src/main/java/org/tikv/common/region/RegionErrorReceiver.java
+++ b/src/main/java/org/tikv/common/region/RegionErrorReceiver.java
@@ -17,12 +17,11 @@
package org.tikv.common.region;
-import org.tikv.kvproto.Metapb.Store;
-
public interface RegionErrorReceiver {
- boolean onNotLeader(Store store, TiRegion region);
+ boolean onNotLeader(TiRegion region);
- void onStoreNotMatch(Store store);
+ /// return whether we need to retry this request.
+ boolean onStoreUnreachable();
TiRegion getRegion();
}
diff --git a/src/main/java/org/tikv/common/region/RegionManager.java b/src/main/java/org/tikv/common/region/RegionManager.java
index 745b53399f4..128509408cc 100644
--- a/src/main/java/org/tikv/common/region/RegionManager.java
+++ b/src/main/java/org/tikv/common/region/RegionManager.java
@@ -28,6 +28,9 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,11 +40,11 @@
import org.tikv.common.exception.TiClientInternalException;
import org.tikv.common.key.Key;
import org.tikv.common.util.BackOffer;
+import org.tikv.common.util.ChannelFactory;
import org.tikv.common.util.ConcreteBackOffer;
import org.tikv.common.util.Pair;
import org.tikv.kvproto.Metapb;
import org.tikv.kvproto.Metapb.Peer;
-import org.tikv.kvproto.Metapb.Store;
import org.tikv.kvproto.Metapb.StoreState;
@SuppressWarnings("UnstableApiUsage")
@@ -50,6 +53,8 @@ public class RegionManager {
// TODO: the region cache logic need rewrite.
// https://github.com/pingcap/tispark/issues/1170
private final RegionCache cache;
+ private final ScheduledExecutorService executor;
+ private final UnreachableStoreChecker storeChecker;
private final Function cacheInvalidateCallback;
@@ -65,11 +70,33 @@ public RegionManager(
ReadOnlyPDClient pdClient, Function cacheInvalidateCallback) {
this.cache = new RegionCache(pdClient);
this.cacheInvalidateCallback = cacheInvalidateCallback;
+ this.executor = null;
+ this.storeChecker = null;
+ }
+
+ public RegionManager(
+ ReadOnlyPDClient pdClient,
+ Function cacheInvalidateCallback,
+ ChannelFactory channelFactory,
+ boolean enableGrpcForward) {
+ this.cache = new RegionCache(pdClient);
+ this.cacheInvalidateCallback = cacheInvalidateCallback;
+ if (enableGrpcForward) {
+ UnreachableStoreChecker storeChecker = new UnreachableStoreChecker(channelFactory, pdClient);
+ this.storeChecker = storeChecker;
+ this.executor = Executors.newScheduledThreadPool(1);
+ this.executor.scheduleAtFixedRate(storeChecker, 5, 5, TimeUnit.SECONDS);
+ } else {
+ this.storeChecker = null;
+ this.executor = null;
+ }
}
public RegionManager(ReadOnlyPDClient pdClient) {
this.cache = new RegionCache(pdClient);
this.cacheInvalidateCallback = null;
+ this.storeChecker = null;
+ this.executor = null;
}
public Function getCacheInvalidateCallback() {
@@ -99,19 +126,19 @@ public TiRegion getRegionById(long regionId) {
return cache.getRegionById(ConcreteBackOffer.newGetBackOff(), regionId);
}
- public Pair getRegionStorePairByKey(ByteString key, BackOffer backOffer) {
+ public Pair getRegionStorePairByKey(ByteString key, BackOffer backOffer) {
return getRegionStorePairByKey(key, TiStoreType.TiKV, backOffer);
}
- public Pair getRegionStorePairByKey(ByteString key) {
+ public Pair getRegionStorePairByKey(ByteString key) {
return getRegionStorePairByKey(key, TiStoreType.TiKV);
}
- public Pair getRegionStorePairByKey(ByteString key, TiStoreType storeType) {
+ public Pair getRegionStorePairByKey(ByteString key, TiStoreType storeType) {
return getRegionStorePairByKey(key, storeType, ConcreteBackOffer.newGetBackOff());
}
- public Pair getRegionStorePairByKey(
+ public Pair getRegionStorePairByKey(
ByteString key, TiStoreType storeType, BackOffer backOffer) {
TiRegion region = cache.getRegionByKey(key, backOffer);
if (region == null) {
@@ -121,7 +148,7 @@ public Pair getRegionStorePairByKey(
throw new TiClientInternalException("Region invalid: " + region.toString());
}
- Store store = null;
+ TiStore store = null;
if (storeType == TiStoreType.TiKV) {
Peer peer = region.getCurrentReplica();
store = cache.getStoreById(peer.getStoreId(), backOffer);
@@ -131,8 +158,8 @@ public Pair getRegionStorePairByKey(
} else {
outerLoop:
for (Peer peer : region.getLearnerList()) {
- Store s = getStoreById(peer.getStoreId(), backOffer);
- for (Metapb.StoreLabel label : s.getLabelsList()) {
+ TiStore s = getStoreById(peer.getStoreId(), backOffer);
+ for (Metapb.StoreLabel label : s.getStore().getLabelsList()) {
if (label.getKey().equals(storeType.getLabelKey())
&& label.getValue().equals(storeType.getLabelValue())) {
store = s;
@@ -154,11 +181,11 @@ public Pair getRegionStorePairByKey(
return Pair.create(region, store);
}
- public Store getStoreById(long id) {
+ public TiStore getStoreById(long id) {
return getStoreById(id, ConcreteBackOffer.newGetBackOff());
}
- public Store getStoreById(long id, BackOffer backOffer) {
+ public TiStore getStoreById(long id, BackOffer backOffer) {
return cache.getStoreById(id, backOffer);
}
@@ -184,6 +211,10 @@ public synchronized TiRegion updateLeader(TiRegion region, long storeId) {
return null;
}
+ public boolean updateRegion(TiRegion oldRegion, TiRegion region) {
+ return cache.updateRegion(oldRegion, region);
+ }
+
/**
* Clears all cache when a TiKV server does not respond
*
@@ -194,8 +225,10 @@ public void onRequestFail(TiRegion region) {
}
private void onRequestFail(TiRegion region, long storeId) {
- cache.invalidateRegion(region);
- cache.invalidateAllRegionForStore(storeId);
+ if (this.storeChecker != null) {
+ cache.invalidateRegion(region);
+ cache.invalidateAllRegionForStore(storeId);
+ }
}
public void invalidateStore(long storeId) {
@@ -206,9 +239,13 @@ public void invalidateRegion(TiRegion region) {
cache.invalidateRegion(region);
}
+ public void scheduleHealthCheckJob(TiStore store) {
+ this.storeChecker.scheduleStoreHealthCheck(store);
+ }
+
public static class RegionCache {
private final Map regionCache;
- private final Map storeCache;
+ private final Map storeCache;
private final RangeMap keyToRegionIdCache;
private final ReadOnlyPDClient pdClient;
@@ -298,6 +335,26 @@ public synchronized void invalidateRegion(TiRegion region) {
}
}
+ public synchronized boolean updateRegion(TiRegion expected, TiRegion region) {
+ try {
+ if (logger.isDebugEnabled()) {
+ logger.debug(String.format("invalidateRegion ID[%s]", region.getId()));
+ }
+ TiRegion oldRegion = regionCache.get(region.getId());
+ if (expected != oldRegion) {
+ return false;
+ } else {
+ if (oldRegion != null) {
+ keyToRegionIdCache.remove(makeRange(oldRegion.getStartKey(), oldRegion.getEndKey()));
+ }
+ putRegion(region);
+ return true;
+ }
+ } catch (Exception ignore) {
+ return false;
+ }
+ }
+
public synchronized void invalidateAllRegionForStore(long storeId) {
List regionToRemove = new ArrayList<>();
for (TiRegion r : regionCache.values()) {
@@ -317,16 +374,19 @@ public synchronized void invalidateAllRegionForStore(long storeId) {
}
public synchronized void invalidateStore(long storeId) {
- storeCache.remove(storeId);
+ TiStore store = storeCache.remove(storeId);
+ if (store != null) {
+ store.markReachable();
+ }
}
- public synchronized Store getStoreById(long id, BackOffer backOffer) {
+ public synchronized TiStore getStoreById(long id, BackOffer backOffer) {
try {
- Store store = storeCache.get(id);
+ TiStore store = storeCache.get(id);
if (store == null) {
- store = pdClient.getStore(backOffer, id);
+ store = new TiStore(pdClient.getStore(backOffer, id));
}
- if (store.getState().equals(StoreState.Tombstone)) {
+ if (store.getStore().getState().equals(StoreState.Tombstone)) {
return null;
}
storeCache.put(id, store);
diff --git a/src/main/java/org/tikv/common/region/RegionStoreClient.java b/src/main/java/org/tikv/common/region/RegionStoreClient.java
index e6f37a99495..0be777ab681 100644
--- a/src/main/java/org/tikv/common/region/RegionStoreClient.java
+++ b/src/main/java/org/tikv/common/region/RegionStoreClient.java
@@ -26,6 +26,8 @@
import com.pingcap.tidb.tipb.DAGRequest;
import com.pingcap.tidb.tipb.SelectResponse;
import io.grpc.ManagedChannel;
+import io.grpc.Metadata;
+import io.grpc.stub.MetadataUtils;
import io.prometheus.client.Histogram;
import java.util.*;
import java.util.function.Supplier;
@@ -44,7 +46,6 @@
import org.tikv.kvproto.Coprocessor;
import org.tikv.kvproto.Errorpb;
import org.tikv.kvproto.Kvrpcpb.*;
-import org.tikv.kvproto.Metapb.Store;
import org.tikv.kvproto.TikvGrpc;
import org.tikv.kvproto.TikvGrpc.TikvBlockingStub;
import org.tikv.kvproto.TikvGrpc.TikvStub;
@@ -88,7 +89,7 @@ private synchronized Boolean getIsV4() {
private RegionStoreClient(
TiConfiguration conf,
TiRegion region,
- String storeVersion,
+ TiStore store,
TiStoreType storeType,
ChannelFactory channelFactory,
TikvBlockingStub blockingStub,
@@ -96,15 +97,15 @@ private RegionStoreClient(
RegionManager regionManager,
PDClient pdClient,
RegionStoreClient.RegionStoreClientBuilder clientBuilder) {
- super(conf, region, channelFactory, blockingStub, asyncStub, regionManager);
+ super(conf, region, store, channelFactory, blockingStub, asyncStub, regionManager);
this.storeType = storeType;
if (this.storeType == TiStoreType.TiKV) {
this.lockResolverClient =
AbstractLockResolverClient.getInstance(
- storeVersion,
conf,
region,
+ store,
this.blockingStub,
this.asyncStub,
channelFactory,
@@ -113,10 +114,10 @@ private RegionStoreClient(
clientBuilder);
} else {
- Store tikvStore =
+ TiStore tikvStore =
regionManager.getRegionStorePairByKey(region.getStartKey(), TiStoreType.TiKV).second;
- String addressStr = tikvStore.getAddress();
+ String addressStr = tikvStore.getStore().getAddress();
if (logger.isDebugEnabled()) {
logger.debug(String.format("Create region store client on address %s", addressStr));
}
@@ -127,9 +128,9 @@ private RegionStoreClient(
this.lockResolverClient =
AbstractLockResolverClient.getInstance(
- tikvStore.getVersion(),
conf,
region,
+ tikvStore,
tikvBlockingStub,
tikvAsyncStub,
channelFactory,
@@ -788,6 +789,7 @@ public List splitRegion(Iterable splitKeys) {
new TiRegion(
region,
null,
+ null,
conf.getIsolationLevel(),
conf.getCommandPriority(),
conf.getKvMode(),
@@ -1244,25 +1246,48 @@ public RegionStoreClientBuilder(
this.pdClient = pdClient;
}
- public RegionStoreClient build(TiRegion region, Store store, TiStoreType storeType)
+ public RegionStoreClient build(TiRegion region, TiStore store, TiStoreType storeType)
throws GrpcException {
Objects.requireNonNull(region, "region is null");
Objects.requireNonNull(store, "store is null");
Objects.requireNonNull(storeType, "storeType is null");
- String addressStr = store.getAddress();
+ String addressStr = store.getStore().getAddress();
if (logger.isDebugEnabled()) {
logger.debug(String.format("Create region store client on address %s", addressStr));
}
- ManagedChannel channel = channelFactory.getChannel(addressStr, pdClient.getHostMapping());
-
- TikvBlockingStub blockingStub = TikvGrpc.newBlockingStub(channel);
- TikvStub asyncStub = TikvGrpc.newStub(channel);
+ ManagedChannel channel = null;
+
+ TikvBlockingStub blockingStub = null;
+ TikvStub asyncStub = null;
+
+ if (conf.getEnableGrpcForward() && region.getProxyStore() != null && store.isUnreachable()) {
+ addressStr = region.getProxyStore().getStore().getAddress();
+ channel =
+ channelFactory.getChannel(addressStr, regionManager.getPDClient().getHostMapping());
+ Metadata header = new Metadata();
+ header.put(TiConfiguration.FORWARD_META_DATA_KEY, store.getStore().getAddress());
+ blockingStub = MetadataUtils.attachHeaders(TikvGrpc.newBlockingStub(channel), header);
+ asyncStub = MetadataUtils.attachHeaders(TikvGrpc.newStub(channel), header);
+ } else {
+ // If the store is reachable, which is update by check-health thread
+ if (!store.isUnreachable()) {
+ if (region.getProxyStore() != null) {
+ TiRegion newRegion = region.switchProxyStore(null);
+ if (regionManager.updateRegion(region, newRegion)) {
+ region = newRegion;
+ }
+ }
+ }
+ channel = channelFactory.getChannel(addressStr, pdClient.getHostMapping());
+ blockingStub = TikvGrpc.newBlockingStub(channel);
+ asyncStub = TikvGrpc.newStub(channel);
+ }
return new RegionStoreClient(
conf,
region,
- store.getVersion(),
+ store,
storeType,
channelFactory,
blockingStub,
@@ -1272,7 +1297,8 @@ public RegionStoreClient build(TiRegion region, Store store, TiStoreType storeTy
this);
}
- public synchronized RegionStoreClient build(TiRegion region, Store store) throws GrpcException {
+ public synchronized RegionStoreClient build(TiRegion region, TiStore store)
+ throws GrpcException {
return build(region, store, TiStoreType.TiKV);
}
@@ -1282,12 +1308,12 @@ public synchronized RegionStoreClient build(ByteString key) throws GrpcException
public synchronized RegionStoreClient build(ByteString key, TiStoreType storeType)
throws GrpcException {
- Pair pair = regionManager.getRegionStorePairByKey(key, storeType);
+ Pair pair = regionManager.getRegionStorePairByKey(key, storeType);
return build(pair.first, pair.second, storeType);
}
public synchronized RegionStoreClient build(TiRegion region) throws GrpcException {
- Store store = regionManager.getStoreById(region.getLeader().getStoreId());
+ TiStore store = regionManager.getStoreById(region.getLeader().getStoreId());
return build(region, store, TiStoreType.TiKV);
}
diff --git a/src/main/java/org/tikv/common/region/TiRegion.java b/src/main/java/org/tikv/common/region/TiRegion.java
index 6170d0bbc92..1e8a8c95323 100644
--- a/src/main/java/org/tikv/common/region/TiRegion.java
+++ b/src/main/java/org/tikv/common/region/TiRegion.java
@@ -50,11 +50,13 @@ public class TiRegion implements Serializable {
private final Peer leader;
private final ReplicaSelector replicaSelector;
private final List replicaList;
+ private final TiStore proxyStore;
private int replicaIdx;
public TiRegion(
Region meta,
Peer leader,
+ TiStore proxyStore,
IsolationLevel isolationLevel,
Kvrpcpb.CommandPri commandPri,
KVMode kvMode,
@@ -65,6 +67,7 @@ public TiRegion(
this.isolationLevel = isolationLevel;
this.commandPri = commandPri;
this.replicaSelector = replicaSelector;
+ this.proxyStore = proxyStore;
if (leader == null || leader.getId() == 0) {
if (meta.getPeersCount() == 0) {
throw new TiClientInternalException("Empty peer list for region " + meta.getId());
@@ -197,6 +200,10 @@ public RegionVerID getVerID() {
meta.getId(), meta.getRegionEpoch().getConfVer(), meta.getRegionEpoch().getVersion());
}
+ public TiStore getProxyStore() {
+ return proxyStore;
+ }
+
/**
* switches current peer to the one on specific store. It return false if no peer matches the
* storeID.
@@ -209,12 +216,29 @@ public TiRegion switchPeer(long leaderStoreID) {
for (Peer p : peers) {
if (p.getStoreId() == leaderStoreID) {
return new TiRegion(
- this.meta, p, this.isolationLevel, this.commandPri, this.kvMode, this.replicaSelector);
+ this.meta,
+ p,
+ this.proxyStore,
+ this.isolationLevel,
+ this.commandPri,
+ this.kvMode,
+ this.replicaSelector);
}
}
return null;
}
+ public TiRegion switchProxyStore(TiStore store) {
+ return new TiRegion(
+ this.meta,
+ this.leader,
+ store,
+ this.isolationLevel,
+ this.commandPri,
+ this.kvMode,
+ this.replicaSelector);
+ }
+
public boolean isMoreThan(ByteString key) {
return FastByteComparisons.compareTo(
meta.getStartKey().toByteArray(),
diff --git a/src/main/java/org/tikv/common/region/TiStore.java b/src/main/java/org/tikv/common/region/TiStore.java
new file mode 100644
index 00000000000..db1f2f30443
--- /dev/null
+++ b/src/main/java/org/tikv/common/region/TiStore.java
@@ -0,0 +1,34 @@
+package org.tikv.common.region;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.tikv.kvproto.Metapb;
+
+public class TiStore {
+ private final Metapb.Store store;
+ private AtomicBoolean unreachable;
+
+ public TiStore(Metapb.Store store) {
+ this.store = store;
+ this.unreachable = new AtomicBoolean(false);
+ }
+
+ public boolean markUnreachable() {
+ return this.unreachable.compareAndSet(false, true);
+ }
+
+ public void markReachable() {
+ this.unreachable.set(false);
+ }
+
+ public boolean isUnreachable() {
+ return this.unreachable.get();
+ }
+
+ public Metapb.Store getStore() {
+ return this.store;
+ }
+
+ public long getId() {
+ return this.store.getId();
+ }
+}
diff --git a/src/main/java/org/tikv/common/region/UnreachableStoreChecker.java b/src/main/java/org/tikv/common/region/UnreachableStoreChecker.java
new file mode 100644
index 00000000000..2c948b83714
--- /dev/null
+++ b/src/main/java/org/tikv/common/region/UnreachableStoreChecker.java
@@ -0,0 +1,77 @@
+package org.tikv.common.region;
+
+import io.grpc.ManagedChannel;
+import io.grpc.health.v1.HealthCheckRequest;
+import io.grpc.health.v1.HealthCheckResponse;
+import io.grpc.health.v1.HealthGrpc;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import org.tikv.common.ReadOnlyPDClient;
+import org.tikv.common.util.ChannelFactory;
+
+public class UnreachableStoreChecker implements Runnable {
+ private ConcurrentHashMap stores;
+ private BlockingQueue taskQueue;
+ private final ChannelFactory channelFactory;
+ private final ReadOnlyPDClient pdClient;
+
+ public UnreachableStoreChecker(ChannelFactory channelFactory, ReadOnlyPDClient pdClient) {
+ this.stores = new ConcurrentHashMap();
+ this.taskQueue = new LinkedBlockingQueue<>();
+ this.channelFactory = channelFactory;
+ this.pdClient = pdClient;
+ }
+
+ public void scheduleStoreHealthCheck(TiStore store) {
+ TiStore oldStore = this.stores.get(Long.valueOf(store.getId()));
+ if (oldStore == store) {
+ return;
+ }
+ this.stores.put(Long.valueOf(store.getId()), store);
+ if (!this.taskQueue.add(store)) {
+ // add queue false, mark it reachable so that it can be put again.
+ store.markReachable();
+ }
+ }
+
+ private List getUnhealthStore() {
+ List unhealthStore = new LinkedList<>();
+ while (!this.taskQueue.isEmpty()) {
+ try {
+ TiStore store = this.taskQueue.take();
+ unhealthStore.add(store);
+ } catch (Exception e) {
+ return unhealthStore;
+ }
+ }
+ return unhealthStore;
+ }
+
+ @Override
+ public void run() {
+ List unhealthStore = getUnhealthStore();
+ for (TiStore store : unhealthStore) {
+ if (!store.isUnreachable()) {
+ continue;
+ }
+ String addressStr = store.getStore().getAddress();
+ ManagedChannel channel = channelFactory.getChannel(addressStr, pdClient.getHostMapping());
+ HealthGrpc.HealthBlockingStub stub = HealthGrpc.newBlockingStub(channel);
+ HealthCheckRequest req = HealthCheckRequest.newBuilder().build();
+ try {
+ HealthCheckResponse resp = stub.check(req);
+ if (resp.getStatus() == HealthCheckResponse.ServingStatus.SERVING) {
+ store.markReachable();
+ this.stores.remove(Long.valueOf(store.getId()));
+ continue;
+ }
+ this.taskQueue.add(store);
+ } catch (Exception e) {
+ this.taskQueue.add(store);
+ }
+ }
+ }
+}
diff --git a/src/main/java/org/tikv/common/util/RangeSplitter.java b/src/main/java/org/tikv/common/util/RangeSplitter.java
index 4c5187a12a9..475e799f71c 100644
--- a/src/main/java/org/tikv/common/util/RangeSplitter.java
+++ b/src/main/java/org/tikv/common/util/RangeSplitter.java
@@ -29,9 +29,9 @@
import org.tikv.common.pd.PDUtils;
import org.tikv.common.region.RegionManager;
import org.tikv.common.region.TiRegion;
+import org.tikv.common.region.TiStore;
import org.tikv.common.region.TiStoreType;
import org.tikv.kvproto.Coprocessor.KeyRange;
-import org.tikv.kvproto.Metapb;
public class RangeSplitter {
private final RegionManager regionManager;
@@ -51,12 +51,11 @@ public static RangeSplitter newSplitter(RegionManager mgr) {
* @param handles Handle list
* @return map
*/
- public Map, TLongArrayList> groupByAndSortHandlesByRegionId(
+ public Map, TLongArrayList> groupByAndSortHandlesByRegionId(
long tableId, TLongArrayList handles) {
TLongObjectHashMap regionHandles = new TLongObjectHashMap<>();
- TLongObjectHashMap> idToRegionStorePair =
- new TLongObjectHashMap<>();
- Map, TLongArrayList> result = new HashMap<>();
+ TLongObjectHashMap> idToRegionStorePair = new TLongObjectHashMap<>();
+ Map, TLongArrayList> result = new HashMap<>();
handles.sort();
byte[] endKey = null;
@@ -71,7 +70,7 @@ public Map, TLongArrayList> groupByAndSortHandlesBy
regionHandles.put(curRegion.getId(), handlesInCurRegion);
handlesInCurRegion = new TLongArrayList();
}
- Pair regionStorePair =
+ Pair regionStorePair =
regionManager.getRegionStorePairByKey(ByteString.copyFrom(key.getBytes()));
curRegion = regionStorePair.first;
idToRegionStorePair.put(curRegion.getId(), regionStorePair);
@@ -84,7 +83,7 @@ public Map, TLongArrayList> groupByAndSortHandlesBy
}
regionHandles.forEachEntry(
(k, v) -> {
- Pair regionStorePair = idToRegionStorePair.get(k);
+ Pair regionStorePair = idToRegionStorePair.get(k);
result.put(regionStorePair, v);
return true;
});
@@ -110,7 +109,7 @@ private List splitAndSortHandlesByRegion(long tableId, TLongArrayLis
// Max value for current index handle range
ImmutableList.Builder regionTasks = ImmutableList.builder();
- Map, TLongArrayList> regionHandlesMap =
+ Map, TLongArrayList> regionHandlesMap =
groupByAndSortHandlesByRegionId(tableId, handles);
regionHandlesMap.forEach((k, v) -> createTask(0, v.size(), tableId, v, k, regionTasks));
@@ -123,7 +122,7 @@ private void createTask(
int endPos,
long tableId,
TLongArrayList handles,
- Pair regionStorePair,
+ Pair regionStorePair,
ImmutableList.Builder regionTasks) {
List newKeyRanges = new ArrayList<>(endPos - startPos + 1);
long startHandle = handles.get(startPos);
@@ -163,10 +162,10 @@ public List splitRangeByRegion(List keyRanges, TiStoreType
int i = 0;
KeyRange range = keyRanges.get(i++);
Map> idToRange = new HashMap<>(); // region id to keyRange list
- Map> idToRegion = new HashMap<>();
+ Map> idToRegion = new HashMap<>();
while (true) {
- Pair regionStorePair =
+ Pair regionStorePair =
regionManager.getRegionStorePairByKey(range.getStart(), storeType);
if (regionStorePair == null) {
@@ -203,7 +202,7 @@ public List splitRangeByRegion(List keyRanges, TiStoreType
ImmutableList.Builder resultBuilder = ImmutableList.builder();
idToRange.forEach(
(k, v) -> {
- Pair regionStorePair = idToRegion.get(k);
+ Pair regionStorePair = idToRegion.get(k);
resultBuilder.add(new RegionTask(regionStorePair.first, regionStorePair.second, v));
});
return resultBuilder.build();
@@ -221,24 +220,23 @@ public List splitRangeByRegion(List keyRanges) {
public static class RegionTask implements Serializable {
private final TiRegion region;
- private final Metapb.Store store;
+ private final TiStore store;
private final List ranges;
private final String host;
- RegionTask(TiRegion region, Metapb.Store store, List ranges) {
+ RegionTask(TiRegion region, TiStore store, List ranges) {
this.region = region;
this.store = store;
this.ranges = ranges;
String host = null;
try {
- host = PDUtils.addrToUri(store.getAddress()).getHost();
+ host = PDUtils.addrToUri(store.getStore().getAddress()).getHost();
} catch (Exception ignored) {
}
this.host = host;
}
- public static RegionTask newInstance(
- TiRegion region, Metapb.Store store, List ranges) {
+ public static RegionTask newInstance(TiRegion region, TiStore store, List ranges) {
return new RegionTask(region, store, ranges);
}
@@ -246,7 +244,7 @@ public TiRegion getRegion() {
return region;
}
- public Metapb.Store getStore() {
+ public TiStore getStore() {
return store;
}
diff --git a/src/main/java/org/tikv/txn/AbstractLockResolverClient.java b/src/main/java/org/tikv/txn/AbstractLockResolverClient.java
index 44d3c4a71ca..020068e272a 100644
--- a/src/main/java/org/tikv/txn/AbstractLockResolverClient.java
+++ b/src/main/java/org/tikv/txn/AbstractLockResolverClient.java
@@ -26,6 +26,7 @@
import org.tikv.common.region.RegionManager;
import org.tikv.common.region.RegionStoreClient;
import org.tikv.common.region.TiRegion;
+import org.tikv.common.region.TiStore;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ChannelFactory;
import org.tikv.kvproto.Kvrpcpb;
@@ -66,22 +67,23 @@ static Lock extractLockFromKeyErr(Kvrpcpb.KeyError keyError) {
}
static AbstractLockResolverClient getInstance(
- String storeVersion,
TiConfiguration conf,
TiRegion region,
+ TiStore store,
TikvGrpc.TikvBlockingStub blockingStub,
TikvGrpc.TikvStub asyncStub,
ChannelFactory channelFactory,
RegionManager regionManager,
PDClient pdClient,
RegionStoreClient.RegionStoreClientBuilder clientBuilder) {
- if (StoreVersion.compareTo(storeVersion, Version.RESOLVE_LOCK_V3) < 0) {
+ if (StoreVersion.compareTo(store.getStore().getVersion(), Version.RESOLVE_LOCK_V3) < 0) {
return new LockResolverClientV2(
- conf, region, blockingStub, asyncStub, channelFactory, regionManager);
- } else if (StoreVersion.compareTo(storeVersion, Version.RESOLVE_LOCK_V4) < 0) {
+ conf, region, store, blockingStub, asyncStub, channelFactory, regionManager);
+ } else if (StoreVersion.compareTo(store.getStore().getVersion(), Version.RESOLVE_LOCK_V4) < 0) {
return new LockResolverClientV3(
conf,
region,
+ store,
blockingStub,
asyncStub,
channelFactory,
@@ -92,6 +94,7 @@ static AbstractLockResolverClient getInstance(
return new LockResolverClientV4(
conf,
region,
+ store,
blockingStub,
asyncStub,
channelFactory,
diff --git a/src/main/java/org/tikv/txn/LockResolverClientV2.java b/src/main/java/org/tikv/txn/LockResolverClientV2.java
index 526c483116a..3df5966abda 100644
--- a/src/main/java/org/tikv/txn/LockResolverClientV2.java
+++ b/src/main/java/org/tikv/txn/LockResolverClientV2.java
@@ -42,6 +42,7 @@
import org.tikv.common.region.RegionManager;
import org.tikv.common.region.TiRegion;
import org.tikv.common.region.TiRegion.RegionVerID;
+import org.tikv.common.region.TiStore;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ChannelFactory;
import org.tikv.common.util.TsoUtils;
@@ -74,11 +75,12 @@ public class LockResolverClientV2 extends AbstractRegionStoreClient
public LockResolverClientV2(
TiConfiguration conf,
TiRegion region,
+ TiStore store,
TikvBlockingStub blockingStub,
TikvStub asyncStub,
ChannelFactory channelFactory,
RegionManager regionManager) {
- super(conf, region, channelFactory, blockingStub, asyncStub, regionManager);
+ super(conf, region, store, channelFactory, blockingStub, asyncStub, regionManager);
resolved = new HashMap<>();
recentResolved = new LinkedList<>();
readWriteLock = new ReentrantReadWriteLock();
diff --git a/src/main/java/org/tikv/txn/LockResolverClientV3.java b/src/main/java/org/tikv/txn/LockResolverClientV3.java
index 4ec1f676e8b..0b8d3c89a8c 100644
--- a/src/main/java/org/tikv/txn/LockResolverClientV3.java
+++ b/src/main/java/org/tikv/txn/LockResolverClientV3.java
@@ -39,10 +39,7 @@
import org.tikv.common.exception.RegionException;
import org.tikv.common.exception.TiClientInternalException;
import org.tikv.common.operation.KVErrorHandler;
-import org.tikv.common.region.AbstractRegionStoreClient;
-import org.tikv.common.region.RegionManager;
-import org.tikv.common.region.RegionStoreClient;
-import org.tikv.common.region.TiRegion;
+import org.tikv.common.region.*;
import org.tikv.common.region.TiRegion.RegionVerID;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ChannelFactory;
@@ -79,13 +76,14 @@ public class LockResolverClientV3 extends AbstractRegionStoreClient
public LockResolverClientV3(
TiConfiguration conf,
TiRegion region,
+ TiStore store,
TikvBlockingStub blockingStub,
TikvStub asyncStub,
ChannelFactory channelFactory,
RegionManager regionManager,
PDClient pdClient,
RegionStoreClient.RegionStoreClientBuilder clientBuilder) {
- super(conf, region, channelFactory, blockingStub, asyncStub, regionManager);
+ super(conf, region, store, channelFactory, blockingStub, asyncStub, regionManager);
resolved = new HashMap<>();
recentResolved = new LinkedList<>();
readWriteLock = new ReentrantReadWriteLock();
diff --git a/src/main/java/org/tikv/txn/LockResolverClientV4.java b/src/main/java/org/tikv/txn/LockResolverClientV4.java
index 9b23733553c..07a5552f0f7 100644
--- a/src/main/java/org/tikv/txn/LockResolverClientV4.java
+++ b/src/main/java/org/tikv/txn/LockResolverClientV4.java
@@ -39,10 +39,7 @@
import org.tikv.common.exception.RegionException;
import org.tikv.common.exception.TiClientInternalException;
import org.tikv.common.operation.KVErrorHandler;
-import org.tikv.common.region.AbstractRegionStoreClient;
-import org.tikv.common.region.RegionManager;
-import org.tikv.common.region.RegionStoreClient;
-import org.tikv.common.region.TiRegion;
+import org.tikv.common.region.*;
import org.tikv.common.region.TiRegion.RegionVerID;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ChannelFactory;
@@ -79,13 +76,14 @@ public class LockResolverClientV4 extends AbstractRegionStoreClient
public LockResolverClientV4(
TiConfiguration conf,
TiRegion region,
+ TiStore store,
TikvBlockingStub blockingStub,
TikvStub asyncStub,
ChannelFactory channelFactory,
RegionManager regionManager,
PDClient pdClient,
RegionStoreClient.RegionStoreClientBuilder clientBuilder) {
- super(conf, region, channelFactory, blockingStub, asyncStub, regionManager);
+ super(conf, region, store, channelFactory, blockingStub, asyncStub, regionManager);
resolved = new HashMap<>();
recentResolved = new LinkedList<>();
readWriteLock = new ReentrantReadWriteLock();
diff --git a/src/main/java/org/tikv/txn/TTLManager.java b/src/main/java/org/tikv/txn/TTLManager.java
index 8424ff52f96..82f64fafe5f 100644
--- a/src/main/java/org/tikv/txn/TTLManager.java
+++ b/src/main/java/org/tikv/txn/TTLManager.java
@@ -30,11 +30,11 @@
import org.tikv.common.meta.TiTimestamp;
import org.tikv.common.region.RegionManager;
import org.tikv.common.region.TiRegion;
+import org.tikv.common.region.TiStore;
import org.tikv.common.util.BackOffFunction;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ConcreteBackOffer;
import org.tikv.common.util.Pair;
-import org.tikv.kvproto.Metapb;
import org.tikv.txn.type.ClientRPCResult;
/**
@@ -105,9 +105,9 @@ private void doKeepAlive() {
}
private void sendTxnHeartBeat(BackOffer bo, long ttl) {
- Pair pair = regionManager.getRegionStorePairByKey(primaryLock);
+ Pair pair = regionManager.getRegionStorePairByKey(primaryLock);
TiRegion tiRegion = pair.first;
- Metapb.Store store = pair.second;
+ TiStore store = pair.second;
ClientRPCResult result = kvClient.txnHeartBeat(bo, primaryLock, startTS, ttl, tiRegion, store);
@@ -121,7 +121,7 @@ private void sendTxnHeartBeat(BackOffer bo, long ttl) {
new GrpcException(
String.format("sendTxnHeartBeat failed, regionId=%s", tiRegion.getId()),
result.getException()));
- this.regionManager.invalidateStore(store.getId());
+ this.regionManager.invalidateStore(store.getStore().getId());
this.regionManager.invalidateRegion(tiRegion);
// re-split keys and commit again.
sendTxnHeartBeat(bo, ttl);
diff --git a/src/main/java/org/tikv/txn/TwoPhaseCommitter.java b/src/main/java/org/tikv/txn/TwoPhaseCommitter.java
index 8d7e8af42ad..ce2568261de 100644
--- a/src/main/java/org/tikv/txn/TwoPhaseCommitter.java
+++ b/src/main/java/org/tikv/txn/TwoPhaseCommitter.java
@@ -38,13 +38,13 @@
import org.tikv.common.exception.TiBatchWriteException;
import org.tikv.common.region.RegionManager;
import org.tikv.common.region.TiRegion;
+import org.tikv.common.region.TiStore;
import org.tikv.common.util.BackOffFunction;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ConcreteBackOffer;
import org.tikv.common.util.Pair;
import org.tikv.kvproto.Kvrpcpb;
import org.tikv.kvproto.Kvrpcpb.Op;
-import org.tikv.kvproto.Metapb;
import org.tikv.txn.type.BatchKeys;
import org.tikv.txn.type.ClientRPCResult;
import org.tikv.txn.type.GroupKeyResult;
@@ -150,9 +150,9 @@ public void prewritePrimaryKey(BackOffer backOffer, byte[] primaryKey, byte[] va
private void doPrewritePrimaryKeyWithRetry(BackOffer backOffer, ByteString key, ByteString value)
throws TiBatchWriteException {
- Pair pair = this.regionManager.getRegionStorePairByKey(key, backOffer);
+ Pair pair = this.regionManager.getRegionStorePairByKey(key, backOffer);
TiRegion tiRegion = pair.first;
- Metapb.Store store = pair.second;
+ TiStore store = pair.second;
Kvrpcpb.Mutation mutation;
if (!value.isEmpty()) {
@@ -205,9 +205,9 @@ public void commitPrimaryKey(BackOffer backOffer, byte[] key, long commitTs)
private void doCommitPrimaryKeyWithRetry(BackOffer backOffer, ByteString key, long commitTs)
throws TiBatchWriteException {
- Pair pair = this.regionManager.getRegionStorePairByKey(key, backOffer);
+ Pair pair = this.regionManager.getRegionStorePairByKey(key, backOffer);
TiRegion tiRegion = pair.first;
- Metapb.Store store = pair.second;
+ TiStore store = pair.second;
ByteString[] keys = new ByteString[] {key};
// send rpc request to tikv server
@@ -339,11 +339,11 @@ private void doPrewriteSecondaryKeysInBatchesWithRetry(
// groups keys by region
GroupKeyResult groupResult = this.groupKeysByRegion(keys, size, backOffer);
List batchKeyList = new LinkedList<>();
- Map, List> groupKeyMap = groupResult.getGroupsResult();
+ Map, List> groupKeyMap = groupResult.getGroupsResult();
- for (Map.Entry, List> entry : groupKeyMap.entrySet()) {
+ for (Map.Entry, List> entry : groupKeyMap.entrySet()) {
TiRegion tiRegion = entry.getKey().first;
- Metapb.Store store = entry.getKey().second;
+ TiStore store = entry.getKey().second;
this.appendBatchBySize(batchKeyList, tiRegion, store, entry.getValue(), true, mutations);
}
@@ -454,7 +454,7 @@ private void doPrewriteSecondaryKeySingleBatchWithRetry(
private void appendBatchBySize(
List batchKeyList,
TiRegion tiRegion,
- Metapb.Store store,
+ TiStore store,
List keys,
boolean sizeIncludeValue,
Map mutations) {
@@ -575,11 +575,11 @@ private void doCommitSecondaryKeysWithRetry(
// groups keys by region
GroupKeyResult groupResult = this.groupKeysByRegion(keys, size, backOffer);
List batchKeyList = new ArrayList<>();
- Map, List> groupKeyMap = groupResult.getGroupsResult();
+ Map, List> groupKeyMap = groupResult.getGroupsResult();
- for (Map.Entry, List> entry : groupKeyMap.entrySet()) {
+ for (Map.Entry, List> entry : groupKeyMap.entrySet()) {
TiRegion tiRegion = entry.getKey().first;
- Metapb.Store store = entry.getKey().second;
+ TiStore store = entry.getKey().second;
this.appendBatchBySize(batchKeyList, tiRegion, store, entry.getValue(), false, null);
}
@@ -619,13 +619,12 @@ private void doCommitSecondaryKeySingleBatchWithRetry(
private GroupKeyResult groupKeysByRegion(ByteString[] keys, int size, BackOffer backOffer)
throws TiBatchWriteException {
- Map, List> groups = new HashMap<>();
+ Map, List> groups = new HashMap<>();
int index = 0;
try {
for (; index < size; index++) {
ByteString key = keys[index];
- Pair pair =
- this.regionManager.getRegionStorePairByKey(key, backOffer);
+ Pair pair = this.regionManager.getRegionStorePairByKey(key, backOffer);
if (pair != null) {
groups.computeIfAbsent(pair, e -> new ArrayList<>()).add(key);
}
diff --git a/src/main/java/org/tikv/txn/TxnKVClient.java b/src/main/java/org/tikv/txn/TxnKVClient.java
index e03a1a1a51d..e16876f2373 100644
--- a/src/main/java/org/tikv/txn/TxnKVClient.java
+++ b/src/main/java/org/tikv/txn/TxnKVClient.java
@@ -33,11 +33,11 @@
import org.tikv.common.region.RegionManager;
import org.tikv.common.region.RegionStoreClient;
import org.tikv.common.region.TiRegion;
+import org.tikv.common.region.TiStore;
import org.tikv.common.util.BackOffFunction;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ConcreteBackOffer;
import org.tikv.kvproto.Kvrpcpb;
-import org.tikv.kvproto.Metapb;
import org.tikv.txn.type.ClientRPCResult;
/** KV client of transaction APIs for GET/PUT/DELETE/SCAN */
@@ -94,7 +94,7 @@ public ClientRPCResult prewrite(
long lockTTL,
long startTs,
TiRegion tiRegion,
- Metapb.Store store) {
+ TiStore store) {
ClientRPCResult result = new ClientRPCResult(true, false, null);
// send request
RegionStoreClient client = clientBuilder.build(tiRegion, store);
@@ -116,7 +116,7 @@ public ClientRPCResult txnHeartBeat(
long startTs,
long ttl,
TiRegion tiRegion,
- Metapb.Store store) {
+ TiStore store) {
ClientRPCResult result = new ClientRPCResult(true, false, null);
// send request
RegionStoreClient client = clientBuilder.build(tiRegion, store);
@@ -148,7 +148,7 @@ public ClientRPCResult commit(
long startTs,
long commitTs,
TiRegion tiRegion,
- Metapb.Store store) {
+ TiStore store) {
ClientRPCResult result = new ClientRPCResult(true, false, null);
// send request
RegionStoreClient client = clientBuilder.build(tiRegion, store);
diff --git a/src/main/java/org/tikv/txn/type/BatchKeys.java b/src/main/java/org/tikv/txn/type/BatchKeys.java
index 57657d9958a..7b61c948f0c 100644
--- a/src/main/java/org/tikv/txn/type/BatchKeys.java
+++ b/src/main/java/org/tikv/txn/type/BatchKeys.java
@@ -19,16 +19,15 @@
import java.util.ArrayList;
import java.util.List;
import org.tikv.common.region.TiRegion;
-import org.tikv.kvproto.Metapb;
+import org.tikv.common.region.TiStore;
public class BatchKeys {
private final TiRegion region;
- private final Metapb.Store store;
+ private final TiStore store;
private List keys;
private final int sizeInBytes;
- public BatchKeys(
- TiRegion region, Metapb.Store store, List keysInput, int sizeInBytes) {
+ public BatchKeys(TiRegion region, TiStore store, List keysInput, int sizeInBytes) {
this.region = region;
this.store = store;
this.keys = new ArrayList<>();
@@ -48,7 +47,7 @@ public TiRegion getRegion() {
return region;
}
- public Metapb.Store getStore() {
+ public TiStore getStore() {
return store;
}
diff --git a/src/main/java/org/tikv/txn/type/GroupKeyResult.java b/src/main/java/org/tikv/txn/type/GroupKeyResult.java
index 334dac35daf..837385337a1 100644
--- a/src/main/java/org/tikv/txn/type/GroupKeyResult.java
+++ b/src/main/java/org/tikv/txn/type/GroupKeyResult.java
@@ -20,22 +20,22 @@
import java.util.List;
import java.util.Map;
import org.tikv.common.region.TiRegion;
+import org.tikv.common.region.TiStore;
import org.tikv.common.util.Pair;
-import org.tikv.kvproto.Metapb;
public class GroupKeyResult {
- private Map, List> groupsResult;
+ private Map, List> groupsResult;
public GroupKeyResult() {
this.groupsResult = new HashMap<>();
}
- public Map, List> getGroupsResult() {
+ public Map, List> getGroupsResult() {
return groupsResult;
}
- public void setGroupsResult(Map, List> groupsResult) {
+ public void setGroupsResult(Map, List> groupsResult) {
this.groupsResult = groupsResult;
}
}
diff --git a/src/test/java/org/tikv/common/MockServerTest.java b/src/test/java/org/tikv/common/MockServerTest.java
index bfce6db50cd..a69e10ba383 100644
--- a/src/test/java/org/tikv/common/MockServerTest.java
+++ b/src/test/java/org/tikv/common/MockServerTest.java
@@ -32,6 +32,7 @@ public void setUp() throws IOException {
new TiRegion(
r,
r.getPeers(0),
+ null,
session.getConf().getIsolationLevel(),
session.getConf().getCommandPriority(),
KVMode.TXN,
diff --git a/src/test/java/org/tikv/common/RegionManagerTest.java b/src/test/java/org/tikv/common/RegionManagerTest.java
index d382e21b520..112d5cb0ec6 100644
--- a/src/test/java/org/tikv/common/RegionManagerTest.java
+++ b/src/test/java/org/tikv/common/RegionManagerTest.java
@@ -26,10 +26,10 @@
import org.tikv.common.key.Key;
import org.tikv.common.region.RegionManager;
import org.tikv.common.region.TiRegion;
+import org.tikv.common.region.TiStore;
import org.tikv.common.util.KeyRangeUtils;
import org.tikv.common.util.Pair;
import org.tikv.kvproto.Metapb;
-import org.tikv.kvproto.Metapb.Store;
import org.tikv.kvproto.Metapb.StoreState;
public class RegionManagerTest extends PDMockServerTest {
@@ -115,7 +115,7 @@ public void getStoreByKey() throws Exception {
Metapb.StoreState.Up,
GrpcUtils.makeStoreLabel("k1", "v1"),
GrpcUtils.makeStoreLabel("k2", "v2"))));
- Pair pair = mgr.getRegionStorePairByKey(searchKey);
+ Pair pair = mgr.getRegionStorePairByKey(searchKey);
assertEquals(pair.first.getId(), regionId);
assertEquals(pair.first.getId(), storeId);
}
@@ -133,8 +133,8 @@ public void getStoreById() throws Exception {
Metapb.StoreState.Up,
GrpcUtils.makeStoreLabel("k1", "v1"),
GrpcUtils.makeStoreLabel("k2", "v2"))));
- Store store = mgr.getStoreById(storeId);
- assertEquals(store.getId(), storeId);
+ TiStore store = mgr.getStoreById(storeId);
+ assertEquals(store.getStore().getId(), storeId);
pdServer.addGetStoreResp(
GrpcUtils.makeGetStoreResponse(
diff --git a/src/test/java/org/tikv/common/RegionStoreClientTest.java b/src/test/java/org/tikv/common/RegionStoreClientTest.java
index e74c5823aef..ec3fe88ccfd 100644
--- a/src/test/java/org/tikv/common/RegionStoreClientTest.java
+++ b/src/test/java/org/tikv/common/RegionStoreClientTest.java
@@ -24,6 +24,7 @@
import org.tikv.common.region.RegionManager;
import org.tikv.common.region.RegionStoreClient;
import org.tikv.common.region.RegionStoreClient.RegionStoreClientBuilder;
+import org.tikv.common.region.TiStore;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ConcreteBackOffer;
import org.tikv.kvproto.Kvrpcpb;
@@ -40,13 +41,14 @@ private RegionStoreClient createClientV3() {
}
private RegionStoreClient createClient(String version) {
- Metapb.Store store =
+ Metapb.Store meta =
Metapb.Store.newBuilder()
.setAddress(LOCAL_ADDR + ":" + port)
.setId(1)
.setState(Metapb.StoreState.Up)
.setVersion(version)
.build();
+ TiStore store = new TiStore(meta);
RegionStoreClientBuilder builder =
new RegionStoreClientBuilder(