From e1ec2b56d2e17febbb7bbd832a845000945b5625 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Tue, 15 Jun 2021 21:42:30 +0800 Subject: [PATCH 01/10] refactor kverrorhandler Signed-off-by: Little-Wallace --- .../tikv/common/operation/KVErrorHandler.java | 181 +--------------- .../common/operation/RegionErrorHandler.java | 201 ++++++++++++++++++ .../tikv/common/region/RegionStoreClient.java | 41 ++-- 3 files changed, 230 insertions(+), 193 deletions(-) create mode 100644 src/main/java/org/tikv/common/operation/RegionErrorHandler.java diff --git a/src/main/java/org/tikv/common/operation/KVErrorHandler.java b/src/main/java/org/tikv/common/operation/KVErrorHandler.java index 55bbdef44ee..a1ce31e7335 100644 --- a/src/main/java/org/tikv/common/operation/KVErrorHandler.java +++ b/src/main/java/org/tikv/common/operation/KVErrorHandler.java @@ -19,20 +19,14 @@ import static org.tikv.common.util.BackOffFunction.BackOffFuncType.BoTxnLockFast; -import com.google.protobuf.ByteString; -import io.grpc.Status; -import io.grpc.StatusRuntimeException; import java.util.Collections; import java.util.function.Function; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.tikv.common.codec.KeyUtils; import org.tikv.common.exception.GrpcException; import org.tikv.common.exception.KeyException; import org.tikv.common.region.RegionErrorReceiver; import org.tikv.common.region.RegionManager; -import org.tikv.common.region.TiRegion; -import org.tikv.common.util.BackOffFunction; import org.tikv.common.util.BackOffer; import org.tikv.kvproto.Errorpb; import org.tikv.kvproto.Kvrpcpb; @@ -43,16 +37,12 @@ // TODO: consider refactor to Builder mode public class KVErrorHandler implements ErrorHandler { private static final Logger logger = LoggerFactory.getLogger(KVErrorHandler.class); - // if a store does not have leader currently, store id is set to 0 - private static final int NO_LEADER_STORE_ID = 0; - private final Function getRegionError; private final Function getKeyError; private final Function resolveLockResultCallback; - private final RegionManager regionManager; - private final RegionErrorReceiver recv; private final AbstractLockResolverClient lockResolverClient; private final long callerStartTS; private final boolean forWrite; + private final RegionErrorHandler regionHandler; public KVErrorHandler( RegionManager regionManager, @@ -63,42 +53,14 @@ public KVErrorHandler( Function resolveLockResultCallback, long callerStartTS, boolean forWrite) { - this.recv = recv; + this.regionHandler = new RegionErrorHandler<>(regionManager, recv, getRegionError); this.lockResolverClient = lockResolverClient; - this.regionManager = regionManager; - this.getRegionError = getRegionError; this.getKeyError = getKeyError; this.resolveLockResultCallback = resolveLockResultCallback; this.callerStartTS = callerStartTS; this.forWrite = forWrite; } - public KVErrorHandler( - RegionManager regionManager, - RegionErrorReceiver recv, - Function getRegionError) { - this.recv = recv; - this.lockResolverClient = null; - this.regionManager = regionManager; - this.getRegionError = getRegionError; - this.getKeyError = resp -> null; - this.resolveLockResultCallback = resolveLock -> null; - this.callerStartTS = 0; - this.forWrite = false; - } - - private Errorpb.Error getRegionError(RespT resp) { - if (getRegionError != null) { - return getRegionError.apply(resp); - } - return null; - } - - private void invalidateRegionStoreCache(TiRegion ctxRegion) { - regionManager.invalidateRegion(ctxRegion); - regionManager.invalidateStore(ctxRegion.getLeader().getStoreId()); - } - private void resolveLock(BackOffer backOffer, Lock lock) { if (lockResolverClient != null) { logger.warn("resolving lock"); @@ -123,159 +85,32 @@ private void resolveLock(BackOffer backOffer, Lock lock) { @Override public boolean handleResponseError(BackOffer backOffer, RespT resp) { if (resp == null) { - String msg = String.format("Request Failed with unknown reason for [%s]", recv.getRegion()); + String msg = String.format("Request Failed with unknown reason for [%s]", regionHandler.getRegion()); logger.warn(msg); return handleRequestError(backOffer, new GrpcException(msg)); } - // Region error handling logic - Errorpb.Error error = getRegionError(resp); + Errorpb.Error error = regionHandler.getRegionError(resp); if (error != null) { - if (error.hasNotLeader()) { - // this error is reported from raftstore: - // peer of current request is not leader, the following might be its causes: - // 1. cache is outdated, region has changed its leader, can be solved by re-fetching from PD - // 2. leader of current region is missing, need to wait and then fetch region info from PD - long newStoreId = error.getNotLeader().getLeader().getStoreId(); - boolean retry; - - // update Leader here - logger.warn( - String.format( - "NotLeader Error with region id %d and store id %d, new store id %d", - recv.getRegion().getId(), recv.getRegion().getLeader().getStoreId(), newStoreId)); - - BackOffFunction.BackOffFuncType backOffFuncType; - // if there's current no leader, we do not trigger update pd cache logic - // since issuing store = NO_LEADER_STORE_ID requests to pd will definitely fail. - if (newStoreId != NO_LEADER_STORE_ID) { - // If update leader fails, we need to fetch new region info from pd, - // and re-split key range for new region. Setting retry to false will - // stop retry and enter handleCopResponse logic, which would use RegionMiss - // backOff strategy to wait, fetch new region and re-split key range. - // onNotLeader is only needed when updateLeader succeeds, thus switch - // to a new store address. - TiRegion newRegion = this.regionManager.updateLeader(recv.getRegion(), newStoreId); - retry = - newRegion != null - && recv.onNotLeader(this.regionManager.getStoreById(newStoreId), newRegion); - - backOffFuncType = BackOffFunction.BackOffFuncType.BoUpdateLeader; - } else { - logger.info( - String.format( - "Received zero store id, from region %d try next time", - recv.getRegion().getId())); - - backOffFuncType = BackOffFunction.BackOffFuncType.BoRegionMiss; - retry = false; - } - - if (!retry) { - this.regionManager.invalidateRegion(recv.getRegion()); - } - - backOffer.doBackOff(backOffFuncType, new GrpcException(error.toString())); - - return retry; - } else if (error.hasStoreNotMatch()) { - // this error is reported from raftstore: - // store_id requested at the moment is inconsistent with that expected - // Solution:re-fetch from PD - long storeId = recv.getRegion().getLeader().getStoreId(); - long actualStoreId = error.getStoreNotMatch().getActualStoreId(); - logger.warn( - String.format( - "Store Not Match happened with region id %d, store id %d, actual store id %d", - recv.getRegion().getId(), storeId, actualStoreId)); - - this.regionManager.invalidateRegion(recv.getRegion()); - this.regionManager.invalidateStore(storeId); - // recv.onStoreNotMatch(this.regionManager.getStoreById(storeId)); - // assume this is a low probability error, do not retry, just re-split the request by - // throwing it out. - return false; - } else if (error.hasEpochNotMatch()) { - // this error is reported from raftstore: - // region has outdated version,please try later. - logger.warn(String.format("Stale Epoch encountered for region [%s]", recv.getRegion())); - this.regionManager.onRegionStale(recv.getRegion()); - return false; - } else if (error.hasServerIsBusy()) { - // this error is reported from kv: - // will occur when write pressure is high. Please try later. - logger.warn( - String.format( - "Server is busy for region [%s], reason: %s", - recv.getRegion(), error.getServerIsBusy().getReason())); - backOffer.doBackOff( - BackOffFunction.BackOffFuncType.BoServerBusy, - new StatusRuntimeException( - Status.fromCode(Status.Code.UNAVAILABLE).withDescription(error.toString()))); - backOffer.doBackOff( - BackOffFunction.BackOffFuncType.BoRegionMiss, new GrpcException(error.getMessage())); - return true; - } else if (error.hasStaleCommand()) { - // this error is reported from raftstore: - // command outdated, please try later - logger.warn(String.format("Stale command for region [%s]", recv.getRegion())); - backOffer.doBackOff( - BackOffFunction.BackOffFuncType.BoRegionMiss, new GrpcException(error.getMessage())); - return true; - } else if (error.hasRaftEntryTooLarge()) { - logger.warn(String.format("Raft too large for region [%s]", recv.getRegion())); - throw new StatusRuntimeException( - Status.fromCode(Status.Code.UNAVAILABLE).withDescription(error.toString())); - } else if (error.hasKeyNotInRegion()) { - // this error is reported from raftstore: - // key requested is not in current region - // should not happen here. - ByteString invalidKey = error.getKeyNotInRegion().getKey(); - logger.error( - String.format( - "Key not in region [%s] for key [%s], this error should not happen here.", - recv.getRegion(), KeyUtils.formatBytesUTF8(invalidKey))); - throw new StatusRuntimeException(Status.UNKNOWN.withDescription(error.toString())); - } - - logger.warn(String.format("Unknown error %s for region [%s]", error, recv.getRegion())); - // For other errors, we only drop cache here. - // Upper level may split this task. - invalidateRegionStoreCache(recv.getRegion()); - // retry if raft proposal is dropped, it indicates the store is in the middle of transition - if (error.getMessage().contains("Raft ProposalDropped")) { - backOffer.doBackOff( - BackOffFunction.BackOffFuncType.BoRegionMiss, new GrpcException(error.getMessage())); - return true; - } + return regionHandler.handleRegionError(backOffer, error); } - boolean retry = false; - // Key error handling logic Kvrpcpb.KeyError keyError = getKeyError.apply(resp); if (keyError != null) { try { Lock lock = AbstractLockResolverClient.extractLockFromKeyErr(keyError); resolveLock(backOffer, lock); - retry = true; + return true; } catch (KeyException e) { logger.warn("Unable to handle KeyExceptions other than LockException", e); } } - return retry; + return false; } @Override public boolean handleRequestError(BackOffer backOffer, Exception e) { - regionManager.onRequestFail(recv.getRegion()); - - backOffer.doBackOff( - BackOffFunction.BackOffFuncType.BoTiKVRPC, - new GrpcException( - "send tikv request error: " + e.getMessage() + ", try next peer later", e)); - // TiKV maybe down, so do not retry in `callWithRetry` - // should re-fetch the new leader from PD and send request to it - return false; + return regionHandler.handleRequestError(backOffer, e); } } diff --git a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java new file mode 100644 index 00000000000..69f6c346574 --- /dev/null +++ b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java @@ -0,0 +1,201 @@ +package org.tikv.common.operation; + +import com.google.protobuf.ByteString; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.tikv.common.codec.KeyUtils; +import org.tikv.common.exception.GrpcException; +import org.tikv.common.region.RegionErrorReceiver; +import org.tikv.common.region.RegionManager; +import org.tikv.common.region.TiRegion; +import org.tikv.common.util.BackOffFunction; +import org.tikv.common.util.BackOffer; +import org.tikv.kvproto.Errorpb; + +import java.util.function.Function; + +public class RegionErrorHandler implements ErrorHandler { + private static final Logger logger = LoggerFactory.getLogger(RegionErrorHandler.class); + // if a store does not have leader currently, store id is set to 0 + private static final int NO_LEADER_STORE_ID = 0; + private final Function getRegionError; + private final RegionManager regionManager; + private final RegionErrorReceiver recv; + + public RegionErrorHandler( + RegionManager regionManager, + RegionErrorReceiver recv, + Function getRegionError) { + this.recv = recv; + this.regionManager = regionManager; + this.getRegionError = getRegionError; + } + + @Override + public boolean handleResponseError(BackOffer backOffer, RespT resp) { + if (resp == null) { + String msg = String.format("Request Failed with unknown reason for [%s]", recv.getRegion()); + logger.warn(msg); + return handleRequestError(backOffer, new GrpcException(msg)); + } + // Region error handling logic + Errorpb.Error error = getRegionError(resp); + if (error != null) { + return handleRegionError(backOffer, error); + } + return false; + } + + public boolean handleRegionError(BackOffer backOffer, Errorpb.Error error) { + if (error.hasNotLeader()) { + // this error is reported from raftstore: + // peer of current request is not leader, the following might be its causes: + // 1. cache is outdated, region has changed its leader, can be solved by re-fetching from PD + // 2. leader of current region is missing, need to wait and then fetch region info from PD + long newStoreId = error.getNotLeader().getLeader().getStoreId(); + boolean retry; + + // update Leader here + logger.warn( + String.format( + "NotLeader Error with region id %d and store id %d, new store id %d", + recv.getRegion().getId(), recv.getRegion().getLeader().getStoreId(), newStoreId)); + + BackOffFunction.BackOffFuncType backOffFuncType; + // if there's current no leader, we do not trigger update pd cache logic + // since issuing store = NO_LEADER_STORE_ID requests to pd will definitely fail. + if (newStoreId != NO_LEADER_STORE_ID) { + // If update leader fails, we need to fetch new region info from pd, + // and re-split key range for new region. Setting retry to false will + // stop retry and enter handleCopResponse logic, which would use RegionMiss + // backOff strategy to wait, fetch new region and re-split key range. + // onNotLeader is only needed when updateLeader succeeds, thus switch + // to a new store address. + TiRegion newRegion = this.regionManager.updateLeader(recv.getRegion(), newStoreId); + retry = + newRegion != null + && recv.onNotLeader(this.regionManager.getStoreById(newStoreId), newRegion); + + backOffFuncType = BackOffFunction.BackOffFuncType.BoUpdateLeader; + } else { + logger.info( + String.format( + "Received zero store id, from region %d try next time", + recv.getRegion().getId())); + + backOffFuncType = BackOffFunction.BackOffFuncType.BoRegionMiss; + retry = false; + } + + if (!retry) { + this.regionManager.invalidateRegion(recv.getRegion()); + } + + backOffer.doBackOff(backOffFuncType, new GrpcException(error.toString())); + + return retry; + } else if (error.hasStoreNotMatch()) { + // this error is reported from raftstore: + // store_id requested at the moment is inconsistent with that expected + // Solution:re-fetch from PD + long storeId = recv.getRegion().getLeader().getStoreId(); + long actualStoreId = error.getStoreNotMatch().getActualStoreId(); + logger.warn( + String.format( + "Store Not Match happened with region id %d, store id %d, actual store id %d", + recv.getRegion().getId(), storeId, actualStoreId)); + + this.regionManager.invalidateRegion(recv.getRegion()); + this.regionManager.invalidateStore(storeId); + // recv.onStoreNotMatch(this.regionManager.getStoreById(storeId)); + // assume this is a low probability error, do not retry, just re-split the request by + // throwing it out. + return false; + } else if (error.hasEpochNotMatch()) { + // this error is reported from raftstore: + // region has outdated version,please try later. + logger.warn(String.format("Stale Epoch encountered for region [%s]", recv.getRegion())); + this.regionManager.onRegionStale(recv.getRegion()); + return false; + } else if (error.hasServerIsBusy()) { + // this error is reported from kv: + // will occur when write pressure is high. Please try later. + logger.warn( + String.format( + "Server is busy for region [%s], reason: %s", + recv.getRegion(), error.getServerIsBusy().getReason())); + backOffer.doBackOff( + BackOffFunction.BackOffFuncType.BoServerBusy, + new StatusRuntimeException( + Status.fromCode(Status.Code.UNAVAILABLE).withDescription(error.toString()))); + backOffer.doBackOff( + BackOffFunction.BackOffFuncType.BoRegionMiss, new GrpcException(error.getMessage())); + return true; + } else if (error.hasStaleCommand()) { + // this error is reported from raftstore: + // command outdated, please try later + logger.warn(String.format("Stale command for region [%s]", recv.getRegion())); + backOffer.doBackOff( + BackOffFunction.BackOffFuncType.BoRegionMiss, new GrpcException(error.getMessage())); + return true; + } else if (error.hasRaftEntryTooLarge()) { + logger.warn(String.format("Raft too large for region [%s]", recv.getRegion())); + throw new StatusRuntimeException( + Status.fromCode(Status.Code.UNAVAILABLE).withDescription(error.toString())); + } else if (error.hasKeyNotInRegion()) { + // this error is reported from raftstore: + // key requested is not in current region + // should not happen here. + ByteString invalidKey = error.getKeyNotInRegion().getKey(); + logger.error( + String.format( + "Key not in region [%s] for key [%s], this error should not happen here.", + recv.getRegion(), KeyUtils.formatBytesUTF8(invalidKey))); + throw new StatusRuntimeException(Status.UNKNOWN.withDescription(error.toString())); + } + + logger.warn(String.format("Unknown error %s for region [%s]", error, recv.getRegion())); + // For other errors, we only drop cache here. + // Upper level may split this task. + invalidateRegionStoreCache(recv.getRegion()); + // retry if raft proposal is dropped, it indicates the store is in the middle of transition + if (error.getMessage().contains("Raft ProposalDropped")) { + backOffer.doBackOff( + BackOffFunction.BackOffFuncType.BoRegionMiss, new GrpcException(error.getMessage())); + return true; + } + return false; + } + + @Override + public boolean handleRequestError(BackOffer backOffer, Exception e) { + regionManager.onRequestFail(recv.getRegion()); + + backOffer.doBackOff( + BackOffFunction.BackOffFuncType.BoTiKVRPC, + new GrpcException( + "send tikv request error: " + e.getMessage() + ", try next peer later", e)); + // TiKV maybe down, so do not retry in `callWithRetry` + // should re-fetch the new leader from PD and send request to it + return false; + } + + public Errorpb.Error getRegionError(RespT resp) { + if (getRegionError != null) { + return getRegionError.apply(resp); + } + return null; + } + + public TiRegion getRegion() { + return recv.getRegion(); + } + + + private void invalidateRegionStoreCache(TiRegion ctxRegion) { + regionManager.invalidateRegion(ctxRegion); + regionManager.invalidateStore(ctxRegion.getLeader().getStoreId()); + } +} diff --git a/src/main/java/org/tikv/common/region/RegionStoreClient.java b/src/main/java/org/tikv/common/region/RegionStoreClient.java index 6ea51aa9cb8..e6f37a99495 100644 --- a/src/main/java/org/tikv/common/region/RegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/RegionStoreClient.java @@ -38,6 +38,7 @@ import org.tikv.common.Version; import org.tikv.common.exception.*; import org.tikv.common.operation.KVErrorHandler; +import org.tikv.common.operation.RegionErrorHandler; import org.tikv.common.streaming.StreamingResponse; import org.tikv.common.util.*; import org.tikv.kvproto.Coprocessor; @@ -806,8 +807,8 @@ public ByteString rawGet(BackOffer backOffer, ByteString key) { .setContext(region.getReplicaContext(storeType)) .setKey(key) .build(); - KVErrorHandler handler = - new KVErrorHandler<>( + RegionErrorHandler handler = + new RegionErrorHandler( regionManager, this, resp -> resp.hasRegionError() ? resp.getRegionError() : null); RawGetResponse resp = callWithRetry(backOffer, TikvGrpc.getRawGetMethod(), factory, handler); return rawGetHelper(resp); @@ -841,8 +842,8 @@ public Long rawGetKeyTTL(BackOffer backOffer, ByteString key) { .setContext(region.getReplicaContext(storeType)) .setKey(key) .build(); - KVErrorHandler handler = - new KVErrorHandler<>( + RegionErrorHandler handler = + new RegionErrorHandler( regionManager, this, resp -> resp.hasRegionError() ? resp.getRegionError() : null); RawGetKeyTTLResponse resp = callWithRetry(backOffer, TikvGrpc.getRawGetKeyTTLMethod(), factory, handler); @@ -881,8 +882,8 @@ public void rawDelete(BackOffer backOffer, ByteString key) { .setKey(key) .build(); - KVErrorHandler handler = - new KVErrorHandler<>( + RegionErrorHandler handler = + new RegionErrorHandler( regionManager, this, resp -> resp.hasRegionError() ? resp.getRegionError() : null); RawDeleteResponse resp = callWithRetry(backOffer, TikvGrpc.getRawDeleteMethod(), factory, handler); @@ -919,8 +920,8 @@ public void rawPut(BackOffer backOffer, ByteString key, ByteString value, long t .setTtl(ttl) .build(); - KVErrorHandler handler = - new KVErrorHandler<>( + RegionErrorHandler handler = + new RegionErrorHandler( regionManager, this, resp -> resp.hasRegionError() ? resp.getRegionError() : null); RawPutResponse resp = callWithRetry(backOffer, TikvGrpc.getRawPutMethod(), factory, handler); rawPutHelper(resp); @@ -958,8 +959,8 @@ public ByteString rawPutIfAbsent( .setTtl(ttl) .build(); - KVErrorHandler handler = - new KVErrorHandler<>( + RegionErrorHandler handler = + new RegionErrorHandler( regionManager, this, resp -> resp.hasRegionError() ? resp.getRegionError() : null); RawCASResponse resp = callWithRetry(backOffer, TikvGrpc.getRawCompareAndSwapMethod(), factory, handler); @@ -1000,8 +1001,8 @@ public List rawBatchGet(BackOffer backoffer, List keys) { .setContext(region.getReplicaContext(storeType)) .addAllKeys(keys) .build(); - KVErrorHandler handler = - new KVErrorHandler<>( + RegionErrorHandler handler = + new RegionErrorHandler( regionManager, this, resp -> resp.hasRegionError() ? resp.getRegionError() : null); RawBatchGetResponse resp = callWithRetry(backoffer, TikvGrpc.getRawBatchGetMethod(), factory, handler); @@ -1037,8 +1038,8 @@ public void rawBatchPut(BackOffer backOffer, List kvPairs, long ttl, boo .setTtl(ttl) .setForCas(atomic) .build(); - KVErrorHandler handler = - new KVErrorHandler<>( + RegionErrorHandler handler = + new RegionErrorHandler( regionManager, this, resp -> resp.hasRegionError() ? resp.getRegionError() : null); RawBatchPutResponse resp = callWithRetry(backOffer, TikvGrpc.getRawBatchPutMethod(), factory, handler); @@ -1088,8 +1089,8 @@ public void rawBatchDelete(BackOffer backoffer, List keys, boolean a .addAllKeys(keys) .setForCas(atomic) .build(); - KVErrorHandler handler = - new KVErrorHandler<>( + RegionErrorHandler handler = + new RegionErrorHandler( regionManager, this, resp -> resp.hasRegionError() ? resp.getRegionError() : null); RawBatchDeleteResponse resp = callWithRetry(backoffer, TikvGrpc.getRawBatchDeleteMethod(), factory, handler); @@ -1135,8 +1136,8 @@ public List rawScan(BackOffer backOffer, ByteString key, int limit, bool .setLimit(limit) .build(); - KVErrorHandler handler = - new KVErrorHandler<>( + RegionErrorHandler handler = + new RegionErrorHandler( regionManager, this, resp -> resp.hasRegionError() ? resp.getRegionError() : null); RawScanResponse resp = callWithRetry(backOffer, TikvGrpc.getRawScanMethod(), factory, handler); @@ -1180,8 +1181,8 @@ public void rawDeleteRange(BackOffer backOffer, ByteString startKey, ByteString .setEndKey(endKey) .build(); - KVErrorHandler handler = - new KVErrorHandler<>( + RegionErrorHandler handler = + new RegionErrorHandler( regionManager, this, resp -> resp.hasRegionError() ? resp.getRegionError() : null); RawDeleteRangeResponse resp = callWithRetry(backOffer, TikvGrpc.getRawDeleteRangeMethod(), factory, handler); From bc8ce39bd09a96ee6930bbee23c6d45262267a6e Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Wed, 16 Jun 2021 11:47:17 +0800 Subject: [PATCH 02/10] add parameter store Signed-off-by: Little-Wallace --- .../java/org/tikv/common/ConfigUtils.java | 1 + .../java/org/tikv/common/TiConfiguration.java | 5 + src/main/java/org/tikv/common/TiSession.java | 6 +- .../tikv/common/operation/KVErrorHandler.java | 3 +- .../common/operation/RegionErrorHandler.java | 349 +++++++++--------- .../region/AbstractRegionStoreClient.java | 12 + .../common/region/RegionErrorReceiver.java | 2 + .../org/tikv/common/region/RegionManager.java | 18 +- .../tikv/common/region/RegionStoreClient.java | 11 +- .../java/org/tikv/common/region/TiRegion.java | 11 +- .../tikv/txn/AbstractLockResolverClient.java | 9 +- .../org/tikv/txn/LockResolverClientV2.java | 4 +- 12 files changed, 240 insertions(+), 191 deletions(-) diff --git a/src/main/java/org/tikv/common/ConfigUtils.java b/src/main/java/org/tikv/common/ConfigUtils.java index 6df036d4e05..cada32623af 100644 --- a/src/main/java/org/tikv/common/ConfigUtils.java +++ b/src/main/java/org/tikv/common/ConfigUtils.java @@ -48,6 +48,7 @@ public class ConfigUtils { public static final String TIKV_METRICS_PORT = "tikv.metrics.port"; public static final String TIKV_NETWORK_MAPPING_NAME = "tikv.network.mapping"; + public static final String TIKV_ENABLE_GRPC_FORWARD = "tikv.enable_grpc_forward"; public static final String DEF_PD_ADDRESSES = "127.0.0.1:2379"; public static final String DEF_TIMEOUT = "600ms"; diff --git a/src/main/java/org/tikv/common/TiConfiguration.java b/src/main/java/org/tikv/common/TiConfiguration.java index 0e97028d06f..f963b1c6386 100644 --- a/src/main/java/org/tikv/common/TiConfiguration.java +++ b/src/main/java/org/tikv/common/TiConfiguration.java @@ -245,6 +245,7 @@ private static ReplicaRead getReplicaRead(String key) { private boolean showRowId = getBoolean(TIKV_SHOW_ROWID); private String dbPrefix = get(TIKV_DB_PREFIX); private KVMode kvMode = getKvMode(TIKV_KV_MODE); + private boolean enableGrpcForward = getBoolean(TIKV_ENABLE_GRPC_FORWARD); private int kvClientConcurrency = getInt(TIKV_KV_CLIENT_CONCURRENCY); private ReplicaRead replicaRead = getReplicaRead(TIKV_REPLICA_READ); @@ -532,4 +533,8 @@ public TiConfiguration setMetricsPort(int metricsPort) { public String getNetworkMappingName() { return this.networkMappingName; } + + public boolean getEnableGrpcForward() { + return this.enableGrpcForward; + } } diff --git a/src/main/java/org/tikv/common/TiSession.java b/src/main/java/org/tikv/common/TiSession.java index 41410d64d3b..4648853f928 100644 --- a/src/main/java/org/tikv/common/TiSession.java +++ b/src/main/java/org/tikv/common/TiSession.java @@ -71,6 +71,7 @@ public class TiSession implements AutoCloseable { private volatile ExecutorService batchScanThreadPool; private volatile ExecutorService deleteRangeThreadPool; private volatile RegionManager regionManager; + private volatile boolean enableGrpcForward; private volatile RegionStoreClient.RegionStoreClientBuilder clientBuilder; private boolean isClosed = false; private HTTPServer server; @@ -90,6 +91,7 @@ public TiSession(TiConfiguration conf) { this.collectorRegistry.register(RetryPolicy.GRPC_SINGLE_REQUEST_LATENCY); this.collectorRegistry.register(RegionManager.GET_REGION_BY_KEY_REQUEST_LATENCY); this.collectorRegistry.register(PDClient.PD_GET_REGION_BY_KEY_REQUEST_LATENCY); + this.enableGrpcForward = conf.getEnableGrpcForward(); this.server = new HTTPServer( new InetSocketAddress(conf.getMetricsPort()), this.collectorRegistry, true); @@ -199,7 +201,9 @@ public synchronized RegionManager getRegionManager() { if (res == null) { synchronized (this) { if (regionManager == null) { - regionManager = new RegionManager(getPDClient(), this.cacheInvalidateCallback); + regionManager = + new RegionManager( + getPDClient(), this.cacheInvalidateCallback, this.enableGrpcForward); } res = regionManager; } diff --git a/src/main/java/org/tikv/common/operation/KVErrorHandler.java b/src/main/java/org/tikv/common/operation/KVErrorHandler.java index a1ce31e7335..3321b560740 100644 --- a/src/main/java/org/tikv/common/operation/KVErrorHandler.java +++ b/src/main/java/org/tikv/common/operation/KVErrorHandler.java @@ -85,7 +85,8 @@ private void resolveLock(BackOffer backOffer, Lock lock) { @Override public boolean handleResponseError(BackOffer backOffer, RespT resp) { if (resp == null) { - String msg = String.format("Request Failed with unknown reason for [%s]", regionHandler.getRegion()); + String msg = + String.format("Request Failed with unknown reason for [%s]", regionHandler.getRegion()); logger.warn(msg); return handleRequestError(backOffer, new GrpcException(msg)); } diff --git a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java index 69f6c346574..4137854e752 100644 --- a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java +++ b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java @@ -3,6 +3,7 @@ import com.google.protobuf.ByteString; import io.grpc.Status; import io.grpc.StatusRuntimeException; +import java.util.function.Function; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.tikv.common.codec.KeyUtils; @@ -14,188 +15,184 @@ import org.tikv.common.util.BackOffer; import org.tikv.kvproto.Errorpb; -import java.util.function.Function; - -public class RegionErrorHandler implements ErrorHandler { - private static final Logger logger = LoggerFactory.getLogger(RegionErrorHandler.class); - // if a store does not have leader currently, store id is set to 0 - private static final int NO_LEADER_STORE_ID = 0; - private final Function getRegionError; - private final RegionManager regionManager; - private final RegionErrorReceiver recv; - - public RegionErrorHandler( - RegionManager regionManager, - RegionErrorReceiver recv, - Function getRegionError) { - this.recv = recv; - this.regionManager = regionManager; - this.getRegionError = getRegionError; +public class RegionErrorHandler implements ErrorHandler { + private static final Logger logger = LoggerFactory.getLogger(RegionErrorHandler.class); + // if a store does not have leader currently, store id is set to 0 + private static final int NO_LEADER_STORE_ID = 0; + private final Function getRegionError; + private final RegionManager regionManager; + private final RegionErrorReceiver recv; + + public RegionErrorHandler( + RegionManager regionManager, + RegionErrorReceiver recv, + Function getRegionError) { + this.recv = recv; + this.regionManager = regionManager; + this.getRegionError = getRegionError; + } + + @Override + public boolean handleResponseError(BackOffer backOffer, RespT resp) { + if (resp == null) { + String msg = String.format("Request Failed with unknown reason for [%s]", recv.getRegion()); + logger.warn(msg); + return handleRequestError(backOffer, new GrpcException(msg)); } - - @Override - public boolean handleResponseError(BackOffer backOffer, RespT resp) { - if (resp == null) { - String msg = String.format("Request Failed with unknown reason for [%s]", recv.getRegion()); - logger.warn(msg); - return handleRequestError(backOffer, new GrpcException(msg)); - } - // Region error handling logic - Errorpb.Error error = getRegionError(resp); - if (error != null) { - return handleRegionError(backOffer, error); - } - return false; + // Region error handling logic + Errorpb.Error error = getRegionError(resp); + if (error != null) { + return handleRegionError(backOffer, error); } - - public boolean handleRegionError(BackOffer backOffer, Errorpb.Error error) { - if (error.hasNotLeader()) { - // this error is reported from raftstore: - // peer of current request is not leader, the following might be its causes: - // 1. cache is outdated, region has changed its leader, can be solved by re-fetching from PD - // 2. leader of current region is missing, need to wait and then fetch region info from PD - long newStoreId = error.getNotLeader().getLeader().getStoreId(); - boolean retry; - - // update Leader here - logger.warn( - String.format( - "NotLeader Error with region id %d and store id %d, new store id %d", - recv.getRegion().getId(), recv.getRegion().getLeader().getStoreId(), newStoreId)); - - BackOffFunction.BackOffFuncType backOffFuncType; - // if there's current no leader, we do not trigger update pd cache logic - // since issuing store = NO_LEADER_STORE_ID requests to pd will definitely fail. - if (newStoreId != NO_LEADER_STORE_ID) { - // If update leader fails, we need to fetch new region info from pd, - // and re-split key range for new region. Setting retry to false will - // stop retry and enter handleCopResponse logic, which would use RegionMiss - // backOff strategy to wait, fetch new region and re-split key range. - // onNotLeader is only needed when updateLeader succeeds, thus switch - // to a new store address. - TiRegion newRegion = this.regionManager.updateLeader(recv.getRegion(), newStoreId); - retry = - newRegion != null - && recv.onNotLeader(this.regionManager.getStoreById(newStoreId), newRegion); - - backOffFuncType = BackOffFunction.BackOffFuncType.BoUpdateLeader; - } else { - logger.info( - String.format( - "Received zero store id, from region %d try next time", - recv.getRegion().getId())); - - backOffFuncType = BackOffFunction.BackOffFuncType.BoRegionMiss; - retry = false; - } - - if (!retry) { - this.regionManager.invalidateRegion(recv.getRegion()); - } - - backOffer.doBackOff(backOffFuncType, new GrpcException(error.toString())); - - return retry; - } else if (error.hasStoreNotMatch()) { - // this error is reported from raftstore: - // store_id requested at the moment is inconsistent with that expected - // Solution:re-fetch from PD - long storeId = recv.getRegion().getLeader().getStoreId(); - long actualStoreId = error.getStoreNotMatch().getActualStoreId(); - logger.warn( - String.format( - "Store Not Match happened with region id %d, store id %d, actual store id %d", - recv.getRegion().getId(), storeId, actualStoreId)); - - this.regionManager.invalidateRegion(recv.getRegion()); - this.regionManager.invalidateStore(storeId); - // recv.onStoreNotMatch(this.regionManager.getStoreById(storeId)); - // assume this is a low probability error, do not retry, just re-split the request by - // throwing it out. - return false; - } else if (error.hasEpochNotMatch()) { - // this error is reported from raftstore: - // region has outdated version,please try later. - logger.warn(String.format("Stale Epoch encountered for region [%s]", recv.getRegion())); - this.regionManager.onRegionStale(recv.getRegion()); - return false; - } else if (error.hasServerIsBusy()) { - // this error is reported from kv: - // will occur when write pressure is high. Please try later. - logger.warn( - String.format( - "Server is busy for region [%s], reason: %s", - recv.getRegion(), error.getServerIsBusy().getReason())); - backOffer.doBackOff( - BackOffFunction.BackOffFuncType.BoServerBusy, - new StatusRuntimeException( - Status.fromCode(Status.Code.UNAVAILABLE).withDescription(error.toString()))); - backOffer.doBackOff( - BackOffFunction.BackOffFuncType.BoRegionMiss, new GrpcException(error.getMessage())); - return true; - } else if (error.hasStaleCommand()) { - // this error is reported from raftstore: - // command outdated, please try later - logger.warn(String.format("Stale command for region [%s]", recv.getRegion())); - backOffer.doBackOff( - BackOffFunction.BackOffFuncType.BoRegionMiss, new GrpcException(error.getMessage())); - return true; - } else if (error.hasRaftEntryTooLarge()) { - logger.warn(String.format("Raft too large for region [%s]", recv.getRegion())); - throw new StatusRuntimeException( - Status.fromCode(Status.Code.UNAVAILABLE).withDescription(error.toString())); - } else if (error.hasKeyNotInRegion()) { - // this error is reported from raftstore: - // key requested is not in current region - // should not happen here. - ByteString invalidKey = error.getKeyNotInRegion().getKey(); - logger.error( - String.format( - "Key not in region [%s] for key [%s], this error should not happen here.", - recv.getRegion(), KeyUtils.formatBytesUTF8(invalidKey))); - throw new StatusRuntimeException(Status.UNKNOWN.withDescription(error.toString())); - } - - logger.warn(String.format("Unknown error %s for region [%s]", error, recv.getRegion())); - // For other errors, we only drop cache here. - // Upper level may split this task. - invalidateRegionStoreCache(recv.getRegion()); - // retry if raft proposal is dropped, it indicates the store is in the middle of transition - if (error.getMessage().contains("Raft ProposalDropped")) { - backOffer.doBackOff( - BackOffFunction.BackOffFuncType.BoRegionMiss, new GrpcException(error.getMessage())); - return true; - } - return false; + return false; + } + + public boolean handleRegionError(BackOffer backOffer, Errorpb.Error error) { + if (error.hasNotLeader()) { + // this error is reported from raftstore: + // peer of current request is not leader, the following might be its causes: + // 1. cache is outdated, region has changed its leader, can be solved by re-fetching from PD + // 2. leader of current region is missing, need to wait and then fetch region info from PD + long newStoreId = error.getNotLeader().getLeader().getStoreId(); + boolean retry; + + // update Leader here + logger.warn( + String.format( + "NotLeader Error with region id %d and store id %d, new store id %d", + recv.getRegion().getId(), recv.getRegion().getLeader().getStoreId(), newStoreId)); + + BackOffFunction.BackOffFuncType backOffFuncType; + // if there's current no leader, we do not trigger update pd cache logic + // since issuing store = NO_LEADER_STORE_ID requests to pd will definitely fail. + if (newStoreId != NO_LEADER_STORE_ID) { + // If update leader fails, we need to fetch new region info from pd, + // and re-split key range for new region. Setting retry to false will + // stop retry and enter handleCopResponse logic, which would use RegionMiss + // backOff strategy to wait, fetch new region and re-split key range. + // onNotLeader is only needed when updateLeader succeeds, thus switch + // to a new store address. + TiRegion newRegion = this.regionManager.updateLeader(recv.getRegion(), newStoreId); + retry = + newRegion != null + && recv.onNotLeader(this.regionManager.getStoreById(newStoreId), newRegion); + + backOffFuncType = BackOffFunction.BackOffFuncType.BoUpdateLeader; + } else { + logger.info( + String.format( + "Received zero store id, from region %d try next time", recv.getRegion().getId())); + + backOffFuncType = BackOffFunction.BackOffFuncType.BoRegionMiss; + retry = false; + } + + if (!retry) { + this.regionManager.invalidateRegion(recv.getRegion()); + } + + backOffer.doBackOff(backOffFuncType, new GrpcException(error.toString())); + + return retry; + } else if (error.hasStoreNotMatch()) { + // this error is reported from raftstore: + // store_id requested at the moment is inconsistent with that expected + // Solution:re-fetch from PD + long storeId = recv.getRegion().getLeader().getStoreId(); + long actualStoreId = error.getStoreNotMatch().getActualStoreId(); + logger.warn( + String.format( + "Store Not Match happened with region id %d, store id %d, actual store id %d", + recv.getRegion().getId(), storeId, actualStoreId)); + + this.regionManager.invalidateRegion(recv.getRegion()); + this.regionManager.invalidateStore(storeId); + // recv.onStoreNotMatch(this.regionManager.getStoreById(storeId)); + // assume this is a low probability error, do not retry, just re-split the request by + // throwing it out. + return false; + } else if (error.hasEpochNotMatch()) { + // this error is reported from raftstore: + // region has outdated version,please try later. + logger.warn(String.format("Stale Epoch encountered for region [%s]", recv.getRegion())); + this.regionManager.onRegionStale(recv.getRegion()); + return false; + } else if (error.hasServerIsBusy()) { + // this error is reported from kv: + // will occur when write pressure is high. Please try later. + logger.warn( + String.format( + "Server is busy for region [%s], reason: %s", + recv.getRegion(), error.getServerIsBusy().getReason())); + backOffer.doBackOff( + BackOffFunction.BackOffFuncType.BoServerBusy, + new StatusRuntimeException( + Status.fromCode(Status.Code.UNAVAILABLE).withDescription(error.toString()))); + backOffer.doBackOff( + BackOffFunction.BackOffFuncType.BoRegionMiss, new GrpcException(error.getMessage())); + return true; + } else if (error.hasStaleCommand()) { + // this error is reported from raftstore: + // command outdated, please try later + logger.warn(String.format("Stale command for region [%s]", recv.getRegion())); + backOffer.doBackOff( + BackOffFunction.BackOffFuncType.BoRegionMiss, new GrpcException(error.getMessage())); + return true; + } else if (error.hasRaftEntryTooLarge()) { + logger.warn(String.format("Raft too large for region [%s]", recv.getRegion())); + throw new StatusRuntimeException( + Status.fromCode(Status.Code.UNAVAILABLE).withDescription(error.toString())); + } else if (error.hasKeyNotInRegion()) { + // this error is reported from raftstore: + // key requested is not in current region + // should not happen here. + ByteString invalidKey = error.getKeyNotInRegion().getKey(); + logger.error( + String.format( + "Key not in region [%s] for key [%s], this error should not happen here.", + recv.getRegion(), KeyUtils.formatBytesUTF8(invalidKey))); + throw new StatusRuntimeException(Status.UNKNOWN.withDescription(error.toString())); } - @Override - public boolean handleRequestError(BackOffer backOffer, Exception e) { - regionManager.onRequestFail(recv.getRegion()); - - backOffer.doBackOff( - BackOffFunction.BackOffFuncType.BoTiKVRPC, - new GrpcException( - "send tikv request error: " + e.getMessage() + ", try next peer later", e)); - // TiKV maybe down, so do not retry in `callWithRetry` - // should re-fetch the new leader from PD and send request to it - return false; + logger.warn(String.format("Unknown error %s for region [%s]", error, recv.getRegion())); + // For other errors, we only drop cache here. + // Upper level may split this task. + invalidateRegionStoreCache(recv.getRegion()); + // retry if raft proposal is dropped, it indicates the store is in the middle of transition + if (error.getMessage().contains("Raft ProposalDropped")) { + backOffer.doBackOff( + BackOffFunction.BackOffFuncType.BoRegionMiss, new GrpcException(error.getMessage())); + return true; } - - public Errorpb.Error getRegionError(RespT resp) { - if (getRegionError != null) { - return getRegionError.apply(resp); - } - return null; - } - - public TiRegion getRegion() { - return recv.getRegion(); + return false; + } + + @Override + public boolean handleRequestError(BackOffer backOffer, Exception e) { + regionManager.onRequestFail(recv.getRegion()); + + backOffer.doBackOff( + BackOffFunction.BackOffFuncType.BoTiKVRPC, + new GrpcException( + "send tikv request error: " + e.getMessage() + ", try next peer later", e)); + // TiKV maybe down, so do not retry in `callWithRetry` + // should re-fetch the new leader from PD and send request to it + return false; + } + + public Errorpb.Error getRegionError(RespT resp) { + if (getRegionError != null) { + return getRegionError.apply(resp); } + return null; + } + public TiRegion getRegion() { + return recv.getRegion(); + } - private void invalidateRegionStoreCache(TiRegion ctxRegion) { - regionManager.invalidateRegion(ctxRegion); - regionManager.invalidateStore(ctxRegion.getLeader().getStoreId()); - } + private void invalidateRegionStoreCache(TiRegion ctxRegion) { + regionManager.invalidateRegion(ctxRegion); + regionManager.invalidateStore(ctxRegion.getLeader().getStoreId()); + } } diff --git a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java index 34942688859..b6155f6b740 100644 --- a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java @@ -35,10 +35,12 @@ public abstract class AbstractRegionStoreClient protected final RegionManager regionManager; protected TiRegion region; + protected Metapb.Store store; protected AbstractRegionStoreClient( TiConfiguration conf, TiRegion region, + Metapb.Store store, ChannelFactory channelFactory, TikvGrpc.TikvBlockingStub blockingStub, TikvGrpc.TikvStub asyncStub, @@ -49,6 +51,7 @@ protected AbstractRegionStoreClient( checkArgument(region.getLeader() != null, "Leader Peer is null"); this.region = region; this.regionManager = regionManager; + this.store = store; } public TiRegion getRegion() { @@ -110,4 +113,13 @@ public void onStoreNotMatch(Metapb.Store store) { + addressStr); } } + + @Override + public boolean onStoreUnreachable() { + return false; + } + + private boolean checkHealth() { + return false; + } } diff --git a/src/main/java/org/tikv/common/region/RegionErrorReceiver.java b/src/main/java/org/tikv/common/region/RegionErrorReceiver.java index 42c2d6aeba3..abdbbaf2e60 100644 --- a/src/main/java/org/tikv/common/region/RegionErrorReceiver.java +++ b/src/main/java/org/tikv/common/region/RegionErrorReceiver.java @@ -22,6 +22,8 @@ public interface RegionErrorReceiver { boolean onNotLeader(Store store, TiRegion region); + boolean onStoreUnreachable(); + void onStoreNotMatch(Store store); TiRegion getRegion(); diff --git a/src/main/java/org/tikv/common/region/RegionManager.java b/src/main/java/org/tikv/common/region/RegionManager.java index 745b53399f4..63c377f24aa 100644 --- a/src/main/java/org/tikv/common/region/RegionManager.java +++ b/src/main/java/org/tikv/common/region/RegionManager.java @@ -50,6 +50,7 @@ public class RegionManager { // TODO: the region cache logic need rewrite. // https://github.com/pingcap/tispark/issues/1170 private final RegionCache cache; + private final boolean enableGrpcForward; private final Function cacheInvalidateCallback; @@ -65,11 +66,22 @@ public RegionManager( ReadOnlyPDClient pdClient, Function cacheInvalidateCallback) { this.cache = new RegionCache(pdClient); this.cacheInvalidateCallback = cacheInvalidateCallback; + this.enableGrpcForward = false; + } + + public RegionManager( + ReadOnlyPDClient pdClient, + Function cacheInvalidateCallback, + boolean enableGrpcForward) { + this.cache = new RegionCache(pdClient); + this.cacheInvalidateCallback = cacheInvalidateCallback; + this.enableGrpcForward = enableGrpcForward; } public RegionManager(ReadOnlyPDClient pdClient) { this.cache = new RegionCache(pdClient); this.cacheInvalidateCallback = null; + this.enableGrpcForward = false; } public Function getCacheInvalidateCallback() { @@ -194,8 +206,10 @@ public void onRequestFail(TiRegion region) { } private void onRequestFail(TiRegion region, long storeId) { - cache.invalidateRegion(region); - cache.invalidateAllRegionForStore(storeId); + if (this.enableGrpcForward) { + cache.invalidateRegion(region); + cache.invalidateAllRegionForStore(storeId); + } } public void invalidateStore(long storeId) { diff --git a/src/main/java/org/tikv/common/region/RegionStoreClient.java b/src/main/java/org/tikv/common/region/RegionStoreClient.java index e6f37a99495..896f3060499 100644 --- a/src/main/java/org/tikv/common/region/RegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/RegionStoreClient.java @@ -88,7 +88,7 @@ private synchronized Boolean getIsV4() { private RegionStoreClient( TiConfiguration conf, TiRegion region, - String storeVersion, + Store store, TiStoreType storeType, ChannelFactory channelFactory, TikvBlockingStub blockingStub, @@ -96,15 +96,15 @@ private RegionStoreClient( RegionManager regionManager, PDClient pdClient, RegionStoreClient.RegionStoreClientBuilder clientBuilder) { - super(conf, region, channelFactory, blockingStub, asyncStub, regionManager); + super(conf, region, store, channelFactory, blockingStub, asyncStub, regionManager); this.storeType = storeType; if (this.storeType == TiStoreType.TiKV) { this.lockResolverClient = AbstractLockResolverClient.getInstance( - storeVersion, conf, region, + store, this.blockingStub, this.asyncStub, channelFactory, @@ -127,9 +127,9 @@ private RegionStoreClient( this.lockResolverClient = AbstractLockResolverClient.getInstance( - tikvStore.getVersion(), conf, region, + tikvStore, tikvBlockingStub, tikvAsyncStub, channelFactory, @@ -788,6 +788,7 @@ public List splitRegion(Iterable splitKeys) { new TiRegion( region, null, + null, conf.getIsolationLevel(), conf.getCommandPriority(), conf.getKvMode(), @@ -1262,7 +1263,7 @@ public RegionStoreClient build(TiRegion region, Store store, TiStoreType storeTy return new RegionStoreClient( conf, region, - store.getVersion(), + store, storeType, channelFactory, blockingStub, diff --git a/src/main/java/org/tikv/common/region/TiRegion.java b/src/main/java/org/tikv/common/region/TiRegion.java index 6170d0bbc92..beedd105c18 100644 --- a/src/main/java/org/tikv/common/region/TiRegion.java +++ b/src/main/java/org/tikv/common/region/TiRegion.java @@ -50,11 +50,13 @@ public class TiRegion implements Serializable { private final Peer leader; private final ReplicaSelector replicaSelector; private final List replicaList; + private final Metapb.Store proxyStore; private int replicaIdx; public TiRegion( Region meta, Peer leader, + Metapb.Store proxyStore, IsolationLevel isolationLevel, Kvrpcpb.CommandPri commandPri, KVMode kvMode, @@ -65,6 +67,7 @@ public TiRegion( this.isolationLevel = isolationLevel; this.commandPri = commandPri; this.replicaSelector = replicaSelector; + this.proxyStore = proxyStore; if (leader == null || leader.getId() == 0) { if (meta.getPeersCount() == 0) { throw new TiClientInternalException("Empty peer list for region " + meta.getId()); @@ -209,7 +212,13 @@ public TiRegion switchPeer(long leaderStoreID) { for (Peer p : peers) { if (p.getStoreId() == leaderStoreID) { return new TiRegion( - this.meta, p, this.isolationLevel, this.commandPri, this.kvMode, this.replicaSelector); + this.meta, + p, + this.proxyStore, + this.isolationLevel, + this.commandPri, + this.kvMode, + this.replicaSelector); } } return null; diff --git a/src/main/java/org/tikv/txn/AbstractLockResolverClient.java b/src/main/java/org/tikv/txn/AbstractLockResolverClient.java index 44d3c4a71ca..d9006de34ea 100644 --- a/src/main/java/org/tikv/txn/AbstractLockResolverClient.java +++ b/src/main/java/org/tikv/txn/AbstractLockResolverClient.java @@ -29,6 +29,7 @@ import org.tikv.common.util.BackOffer; import org.tikv.common.util.ChannelFactory; import org.tikv.kvproto.Kvrpcpb; +import org.tikv.kvproto.Metapb; import org.tikv.kvproto.TikvGrpc; public interface AbstractLockResolverClient { @@ -66,19 +67,19 @@ static Lock extractLockFromKeyErr(Kvrpcpb.KeyError keyError) { } static AbstractLockResolverClient getInstance( - String storeVersion, TiConfiguration conf, TiRegion region, + Metapb.Store store, TikvGrpc.TikvBlockingStub blockingStub, TikvGrpc.TikvStub asyncStub, ChannelFactory channelFactory, RegionManager regionManager, PDClient pdClient, RegionStoreClient.RegionStoreClientBuilder clientBuilder) { - if (StoreVersion.compareTo(storeVersion, Version.RESOLVE_LOCK_V3) < 0) { + if (StoreVersion.compareTo(store.getVersion(), Version.RESOLVE_LOCK_V3) < 0) { return new LockResolverClientV2( - conf, region, blockingStub, asyncStub, channelFactory, regionManager); - } else if (StoreVersion.compareTo(storeVersion, Version.RESOLVE_LOCK_V4) < 0) { + conf, region, store, blockingStub, asyncStub, channelFactory, regionManager); + } else if (StoreVersion.compareTo(store.getVersion(), Version.RESOLVE_LOCK_V4) < 0) { return new LockResolverClientV3( conf, region, diff --git a/src/main/java/org/tikv/txn/LockResolverClientV2.java b/src/main/java/org/tikv/txn/LockResolverClientV2.java index 526c483116a..718ddec50f5 100644 --- a/src/main/java/org/tikv/txn/LockResolverClientV2.java +++ b/src/main/java/org/tikv/txn/LockResolverClientV2.java @@ -49,6 +49,7 @@ import org.tikv.kvproto.Kvrpcpb.CleanupResponse; import org.tikv.kvproto.Kvrpcpb.ResolveLockRequest; import org.tikv.kvproto.Kvrpcpb.ResolveLockResponse; +import org.tikv.kvproto.Metapb; import org.tikv.kvproto.TikvGrpc; import org.tikv.kvproto.TikvGrpc.TikvBlockingStub; import org.tikv.kvproto.TikvGrpc.TikvStub; @@ -74,11 +75,12 @@ public class LockResolverClientV2 extends AbstractRegionStoreClient public LockResolverClientV2( TiConfiguration conf, TiRegion region, + Metapb.Store store, TikvBlockingStub blockingStub, TikvStub asyncStub, ChannelFactory channelFactory, RegionManager regionManager) { - super(conf, region, channelFactory, blockingStub, asyncStub, regionManager); + super(conf, region, store, channelFactory, blockingStub, asyncStub, regionManager); resolved = new HashMap<>(); recentResolved = new LinkedList<>(); readWriteLock = new ReentrantReadWriteLock(); From 2c0492b81e07ad0ebb4cda6e3568c1fb4d3833f9 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Wed, 16 Jun 2021 15:45:37 +0800 Subject: [PATCH 03/10] support grpc forward for tikv client Signed-off-by: Little-Wallace --- pom.xml | 5 + src/main/java/org/tikv/cdc/CDCClient.java | 6 +- src/main/java/org/tikv/common/PDClient.java | 4 + .../java/org/tikv/common/TiConfiguration.java | 4 + src/main/java/org/tikv/common/TiSession.java | 6 +- .../common/operation/RegionErrorHandler.java | 16 +++- .../iterator/ConcreteScanIterator.java | 6 +- .../operation/iterator/DAGIterator.java | 6 +- .../region/AbstractRegionStoreClient.java | 92 ++++++++++++++----- .../common/region/RegionErrorReceiver.java | 6 +- .../org/tikv/common/region/RegionManager.java | 53 ++++++++--- .../tikv/common/region/RegionStoreClient.java | 42 ++++++--- .../java/org/tikv/common/region/TiRegion.java | 19 +++- .../java/org/tikv/common/region/TiStore.java | 26 ++++++ .../org/tikv/common/util/RangeSplitter.java | 34 ++++--- .../tikv/txn/AbstractLockResolverClient.java | 10 +- .../org/tikv/txn/LockResolverClientV2.java | 4 +- .../org/tikv/txn/LockResolverClientV3.java | 8 +- .../org/tikv/txn/LockResolverClientV4.java | 8 +- src/main/java/org/tikv/txn/TTLManager.java | 8 +- .../java/org/tikv/txn/TwoPhaseCommitter.java | 29 +++--- src/main/java/org/tikv/txn/TxnKVClient.java | 8 +- .../java/org/tikv/txn/type/BatchKeys.java | 9 +- .../org/tikv/txn/type/GroupKeyResult.java | 8 +- .../java/org/tikv/common/MockServerTest.java | 1 + 25 files changed, 278 insertions(+), 140 deletions(-) create mode 100644 src/main/java/org/tikv/common/region/TiStore.java diff --git a/pom.xml b/pom.xml index 4e4cff5be70..ed6ed478ae9 100644 --- a/pom.xml +++ b/pom.xml @@ -126,6 +126,11 @@ grpc-stub ${grpc.version} + + io.grpc + grpc-all + ${grpc.version} + io.grpc grpc-testing diff --git a/src/main/java/org/tikv/cdc/CDCClient.java b/src/main/java/org/tikv/cdc/CDCClient.java index 1617773d505..c6ee84a3526 100644 --- a/src/main/java/org/tikv/cdc/CDCClient.java +++ b/src/main/java/org/tikv/cdc/CDCClient.java @@ -149,7 +149,11 @@ private synchronized void addRegions(final Iterable regions, final lon for (final TiRegion region : regions) { if (overlapWithRegion(region)) { final String address = - session.getRegionManager().getStoreById(region.getLeader().getStoreId()).getAddress(); + session + .getRegionManager() + .getStoreById(region.getLeader().getStoreId()) + .getStore() + .getAddress(); final ManagedChannel channel = session.getChannelFactory().getChannel(address, session.getPDClient().getHostMapping()); try { diff --git a/src/main/java/org/tikv/common/PDClient.java b/src/main/java/org/tikv/common/PDClient.java index ac4d4f39182..20693547d39 100644 --- a/src/main/java/org/tikv/common/PDClient.java +++ b/src/main/java/org/tikv/common/PDClient.java @@ -241,6 +241,7 @@ public TiRegion getRegionByKey(BackOffer backOffer, ByteString key) { return new TiRegion( resp.getRegion(), resp.getLeader(), + null, conf.getIsolationLevel(), conf.getCommandPriority(), conf.getKvMode(), @@ -258,6 +259,7 @@ public Future getRegionByKeyAsync(BackOffer backOffer, ByteString key) new TiRegion( resp.getRegion(), resp.getLeader(), + null, conf.getIsolationLevel(), conf.getCommandPriority(), conf.getKvMode(), @@ -285,6 +287,7 @@ public TiRegion getRegionByID(BackOffer backOffer, long id) { return new TiRegion( resp.getRegion(), resp.getLeader(), + null, conf.getIsolationLevel(), conf.getCommandPriority(), conf.getKvMode(), @@ -299,6 +302,7 @@ public Future getRegionByIDAsync(BackOffer backOffer, long id) { new TiRegion( resp.getRegion(), resp.getLeader(), + null, conf.getIsolationLevel(), conf.getCommandPriority(), conf.getKvMode(), diff --git a/src/main/java/org/tikv/common/TiConfiguration.java b/src/main/java/org/tikv/common/TiConfiguration.java index f963b1c6386..21532071cfe 100644 --- a/src/main/java/org/tikv/common/TiConfiguration.java +++ b/src/main/java/org/tikv/common/TiConfiguration.java @@ -17,6 +17,7 @@ import static org.tikv.common.ConfigUtils.*; +import io.grpc.Metadata; import java.io.Serializable; import java.net.URI; import java.util.*; @@ -32,6 +33,8 @@ public class TiConfiguration implements Serializable { private static final Logger logger = LoggerFactory.getLogger(TiConfiguration.class); private static final ConcurrentHashMap settings = new ConcurrentHashMap<>(); + public static final Metadata.Key FORWARD_META_DATA_KEY = + Metadata.Key.of("tikv-forwarded-host", Metadata.ASCII_STRING_MARSHALLER); static { loadFromSystemProperties(); @@ -72,6 +75,7 @@ private static void loadFromDefaultProperties() { setIfMissing(TIKV_METRICS_ENABLE, DEF_METRICS_ENABLE); setIfMissing(TIKV_METRICS_PORT, DEF_METRICS_PORT); setIfMissing(TIKV_NETWORK_MAPPING_NAME, DEF_TIKV_NETWORK_MAPPING_NAME); + setIfMissing(TIKV_ENABLE_GRPC_FORWARD, false); } public static void listAll() { diff --git a/src/main/java/org/tikv/common/TiSession.java b/src/main/java/org/tikv/common/TiSession.java index 4648853f928..ff0502e46c3 100644 --- a/src/main/java/org/tikv/common/TiSession.java +++ b/src/main/java/org/tikv/common/TiSession.java @@ -43,8 +43,8 @@ import org.tikv.common.region.RegionStoreClient; import org.tikv.common.region.RegionStoreClient.RegionStoreClientBuilder; import org.tikv.common.region.TiRegion; +import org.tikv.common.region.TiStore; import org.tikv.common.util.*; -import org.tikv.kvproto.Metapb; import org.tikv.raw.RawKVClient; import org.tikv.txn.KVClient; import org.tikv.txn.TxnKVClient; @@ -419,10 +419,10 @@ private List splitRegion(List splitKeys, BackOffer backOff groupKeysByRegion(regionManager, splitKeys, backOffer); for (Map.Entry> entry : groupKeys.entrySet()) { - Pair pair = + Pair pair = getRegionManager().getRegionStorePairByKey(entry.getKey().getStartKey()); TiRegion region = pair.first; - Metapb.Store store = pair.second; + TiStore store = pair.second; List splits = entry .getValue() diff --git a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java index 4137854e752..ccfe46a558f 100644 --- a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java +++ b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java @@ -73,9 +73,7 @@ public boolean handleRegionError(BackOffer backOffer, Errorpb.Error error) { // onNotLeader is only needed when updateLeader succeeds, thus switch // to a new store address. TiRegion newRegion = this.regionManager.updateLeader(recv.getRegion(), newStoreId); - retry = - newRegion != null - && recv.onNotLeader(this.regionManager.getStoreById(newStoreId), newRegion); + retry = newRegion != null && recv.onNotLeader(newRegion); backOffFuncType = BackOffFunction.BackOffFuncType.BoUpdateLeader; } else { @@ -107,7 +105,6 @@ public boolean handleRegionError(BackOffer backOffer, Errorpb.Error error) { this.regionManager.invalidateRegion(recv.getRegion()); this.regionManager.invalidateStore(storeId); - // recv.onStoreNotMatch(this.regionManager.getStoreById(storeId)); // assume this is a low probability error, do not retry, just re-split the request by // throwing it out. return false; @@ -169,7 +166,16 @@ public boolean handleRegionError(BackOffer backOffer, Errorpb.Error error) { @Override public boolean handleRequestError(BackOffer backOffer, Exception e) { - regionManager.onRequestFail(recv.getRegion()); + Status status = Status.fromThrowable(e); + if (status == Status.UNAVAILABLE || status == Status.DEADLINE_EXCEEDED) { + if (recv.onStoreUnreachable()) { + return true; + } else { + regionManager.onRequestFail(recv.getRegion()); + } + } else { + regionManager.onRequestFail(recv.getRegion()); + } backOffer.doBackOff( BackOffFunction.BackOffFuncType.BoTiKVRPC, diff --git a/src/main/java/org/tikv/common/operation/iterator/ConcreteScanIterator.java b/src/main/java/org/tikv/common/operation/iterator/ConcreteScanIterator.java index e563296ab51..1f5ac6fcf36 100644 --- a/src/main/java/org/tikv/common/operation/iterator/ConcreteScanIterator.java +++ b/src/main/java/org/tikv/common/operation/iterator/ConcreteScanIterator.java @@ -27,11 +27,11 @@ import org.tikv.common.region.RegionStoreClient; import org.tikv.common.region.RegionStoreClient.RegionStoreClientBuilder; import org.tikv.common.region.TiRegion; +import org.tikv.common.region.TiStore; import org.tikv.common.util.BackOffer; import org.tikv.common.util.ConcreteBackOffer; import org.tikv.common.util.Pair; import org.tikv.kvproto.Kvrpcpb; -import org.tikv.kvproto.Metapb; public class ConcreteScanIterator extends ScanIterator { private final long version; @@ -82,10 +82,10 @@ TiRegion loadCurrentRegionToCache() throws GrpcException { private ByteString resolveCurrentLock(Kvrpcpb.KvPair current) { logger.warn(String.format("resolve current key error %s", current.getError().toString())); - Pair pair = + Pair pair = builder.getRegionManager().getRegionStorePairByKey(current.getKey()); TiRegion region = pair.first; - Metapb.Store store = pair.second; + TiStore store = pair.second; BackOffer backOffer = ConcreteBackOffer.newGetBackOff(); try (RegionStoreClient client = builder.build(region, store)) { return client.get(backOffer, current.getKey(), version); diff --git a/src/main/java/org/tikv/common/operation/iterator/DAGIterator.java b/src/main/java/org/tikv/common/operation/iterator/DAGIterator.java index f9dc40536a9..36b9fbc14e5 100644 --- a/src/main/java/org/tikv/common/operation/iterator/DAGIterator.java +++ b/src/main/java/org/tikv/common/operation/iterator/DAGIterator.java @@ -32,12 +32,12 @@ import org.tikv.common.operation.SchemaInfer; import org.tikv.common.region.RegionStoreClient; import org.tikv.common.region.TiRegion; +import org.tikv.common.region.TiStore; import org.tikv.common.region.TiStoreType; import org.tikv.common.util.BackOffer; import org.tikv.common.util.ConcreteBackOffer; import org.tikv.common.util.RangeSplitter; import org.tikv.kvproto.Coprocessor; -import org.tikv.kvproto.Metapb; public abstract class DAGIterator extends org.tikv.common.operation.iterator.CoprocessorIterator { @@ -204,7 +204,7 @@ private SelectResponse process(RangeSplitter.RegionTask regionTask) { } List ranges = task.getRanges(); TiRegion region = task.getRegion(); - Metapb.Store store = task.getStore(); + TiStore store = task.getStore(); try { RegionStoreClient client = @@ -245,7 +245,7 @@ private SelectResponse process(RangeSplitter.RegionTask regionTask) { private Iterator processByStreaming(RangeSplitter.RegionTask regionTask) { List ranges = regionTask.getRanges(); TiRegion region = regionTask.getRegion(); - Metapb.Store store = regionTask.getStore(); + TiStore store = regionTask.getStore(); RegionStoreClient client; try { diff --git a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java index b6155f6b740..72cd9b3a872 100644 --- a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java @@ -21,6 +21,9 @@ import static com.google.common.base.Preconditions.checkNotNull; import io.grpc.ManagedChannel; +import io.grpc.Metadata; +import io.grpc.stub.MetadataUtils; +import java.util.List; import java.util.concurrent.TimeUnit; import org.tikv.common.AbstractGRPCClient; import org.tikv.common.TiConfiguration; @@ -35,12 +38,12 @@ public abstract class AbstractRegionStoreClient protected final RegionManager regionManager; protected TiRegion region; - protected Metapb.Store store; + protected TiStore targetStore; protected AbstractRegionStoreClient( TiConfiguration conf, TiRegion region, - Metapb.Store store, + TiStore store, ChannelFactory channelFactory, TikvGrpc.TikvBlockingStub blockingStub, TikvGrpc.TikvStub asyncStub, @@ -51,7 +54,7 @@ protected AbstractRegionStoreClient( checkArgument(region.getLeader() != null, "Leader Peer is null"); this.region = region; this.regionManager = regionManager; - this.store = store; + this.targetStore = store; } public TiRegion getRegion() { @@ -78,9 +81,9 @@ public void close() throws GrpcException {} * @return false when re-split is needed. */ @Override - public boolean onNotLeader(Metapb.Store newStore, TiRegion newRegion) { + public boolean onNotLeader(TiRegion newRegion) { if (logger.isDebugEnabled()) { - logger.debug(region + ", new leader = " + newStore.getId()); + logger.debug(region + ", new leader = " + newRegion.getLeader().getStoreId()); } // When switch leader fails or the region changed its region epoch, // it would be necessary to re-split task's key range for new region. @@ -88,7 +91,8 @@ public boolean onNotLeader(Metapb.Store newStore, TiRegion newRegion) { return false; } region = newRegion; - String addressStr = regionManager.getStoreById(region.getLeader().getStoreId()).getAddress(); + targetStore = regionManager.getStoreById(region.getLeader().getStoreId()); + String addressStr = targetStore.getStore().getAddress(); ManagedChannel channel = channelFactory.getChannel(addressStr, regionManager.getPDClient().getHostMapping()); blockingStub = TikvGrpc.newBlockingStub(channel); @@ -97,29 +101,69 @@ public boolean onNotLeader(Metapb.Store newStore, TiRegion newRegion) { } @Override - public void onStoreNotMatch(Metapb.Store store) { - String addressStr = store.getAddress(); + public boolean onStoreUnreachable() { + if (!conf.getEnableGrpcForward()) { + return false; + } + if (region.getProxyStore() != null) { + TiStore store = region.getProxyStore(); + if (checkHealth(store)) { + return true; + } else { + store.invalid(); + region = region.switchProxyStore(null); + regionManager.updateRegion(region); + } + } else { + if (checkHealth(targetStore)) { + return true; + } else { + targetStore.invalid(); + } + } + TiRegion proxyRegion = switchProxyStore(); + if (proxyRegion == null) { + return false; + } + region = proxyRegion; + regionManager.updateRegion(proxyRegion); + String addressStr = region.getProxyStore().getStore().getAddress(); ManagedChannel channel = channelFactory.getChannel(addressStr, regionManager.getPDClient().getHostMapping()); - blockingStub = TikvGrpc.newBlockingStub(channel); - asyncStub = TikvGrpc.newStub(channel); - if (region.getLeader().getStoreId() != store.getId()) { - logger.warn( - "store_not_match may occur? " - + region - + ", original store = " - + store.getId() - + " address = " - + addressStr); - } + Metadata header = new Metadata(); + header.put(TiConfiguration.FORWARD_META_DATA_KEY, targetStore.getStore().getAddress()); + blockingStub = MetadataUtils.attachHeaders(TikvGrpc.newBlockingStub(channel), header); + asyncStub = MetadataUtils.attachHeaders(TikvGrpc.newStub(channel), header); + return true; } - @Override - public boolean onStoreUnreachable() { - return false; + private boolean checkHealth(TiStore store) { + return true; } - private boolean checkHealth() { - return false; + private TiRegion switchProxyStore() { + boolean hasVisitedStore = false; + List peers = region.getFollowerList(); + for (int i = 0; i < peers.size() * 2; i++) { + int idx = i % peers.size(); + Metapb.Peer peer = peers.get(idx); + if (peer.getStoreId() != region.getLeader().getStoreId()) { + if (region.getProxyStore() == null) { + TiStore store = regionManager.getStoreById(peer.getStoreId()); + return region.switchProxyStore(store); + } else { + TiStore proxyStore = region.getProxyStore(); + if (peer.getStoreId() == proxyStore.getStore().getId()) { + hasVisitedStore = true; + } else if (hasVisitedStore) { + proxyStore = regionManager.getStoreById(peer.getStoreId()); + if (!proxyStore.isUnreachable() && checkHealth(proxyStore)) { + return region.switchProxyStore(proxyStore); + } + } + } + } + } + return null; } } diff --git a/src/main/java/org/tikv/common/region/RegionErrorReceiver.java b/src/main/java/org/tikv/common/region/RegionErrorReceiver.java index abdbbaf2e60..3f4af86c818 100644 --- a/src/main/java/org/tikv/common/region/RegionErrorReceiver.java +++ b/src/main/java/org/tikv/common/region/RegionErrorReceiver.java @@ -17,14 +17,10 @@ package org.tikv.common.region; -import org.tikv.kvproto.Metapb.Store; - public interface RegionErrorReceiver { - boolean onNotLeader(Store store, TiRegion region); + boolean onNotLeader(TiRegion region); boolean onStoreUnreachable(); - void onStoreNotMatch(Store store); - TiRegion getRegion(); } diff --git a/src/main/java/org/tikv/common/region/RegionManager.java b/src/main/java/org/tikv/common/region/RegionManager.java index 63c377f24aa..38645717229 100644 --- a/src/main/java/org/tikv/common/region/RegionManager.java +++ b/src/main/java/org/tikv/common/region/RegionManager.java @@ -41,7 +41,6 @@ import org.tikv.common.util.Pair; import org.tikv.kvproto.Metapb; import org.tikv.kvproto.Metapb.Peer; -import org.tikv.kvproto.Metapb.Store; import org.tikv.kvproto.Metapb.StoreState; @SuppressWarnings("UnstableApiUsage") @@ -111,19 +110,19 @@ public TiRegion getRegionById(long regionId) { return cache.getRegionById(ConcreteBackOffer.newGetBackOff(), regionId); } - public Pair getRegionStorePairByKey(ByteString key, BackOffer backOffer) { + public Pair getRegionStorePairByKey(ByteString key, BackOffer backOffer) { return getRegionStorePairByKey(key, TiStoreType.TiKV, backOffer); } - public Pair getRegionStorePairByKey(ByteString key) { + public Pair getRegionStorePairByKey(ByteString key) { return getRegionStorePairByKey(key, TiStoreType.TiKV); } - public Pair getRegionStorePairByKey(ByteString key, TiStoreType storeType) { + public Pair getRegionStorePairByKey(ByteString key, TiStoreType storeType) { return getRegionStorePairByKey(key, storeType, ConcreteBackOffer.newGetBackOff()); } - public Pair getRegionStorePairByKey( + public Pair getRegionStorePairByKey( ByteString key, TiStoreType storeType, BackOffer backOffer) { TiRegion region = cache.getRegionByKey(key, backOffer); if (region == null) { @@ -133,7 +132,7 @@ public Pair getRegionStorePairByKey( throw new TiClientInternalException("Region invalid: " + region.toString()); } - Store store = null; + TiStore store = null; if (storeType == TiStoreType.TiKV) { Peer peer = region.getCurrentReplica(); store = cache.getStoreById(peer.getStoreId(), backOffer); @@ -143,8 +142,8 @@ public Pair getRegionStorePairByKey( } else { outerLoop: for (Peer peer : region.getLearnerList()) { - Store s = getStoreById(peer.getStoreId(), backOffer); - for (Metapb.StoreLabel label : s.getLabelsList()) { + TiStore s = getStoreById(peer.getStoreId(), backOffer); + for (Metapb.StoreLabel label : s.getStore().getLabelsList()) { if (label.getKey().equals(storeType.getLabelKey()) && label.getValue().equals(storeType.getLabelValue())) { store = s; @@ -166,11 +165,11 @@ public Pair getRegionStorePairByKey( return Pair.create(region, store); } - public Store getStoreById(long id) { + public TiStore getStoreById(long id) { return getStoreById(id, ConcreteBackOffer.newGetBackOff()); } - public Store getStoreById(long id, BackOffer backOffer) { + public TiStore getStoreById(long id, BackOffer backOffer) { return cache.getStoreById(id, backOffer); } @@ -196,6 +195,10 @@ public synchronized TiRegion updateLeader(TiRegion region, long storeId) { return null; } + public boolean updateRegion(TiRegion region) { + return cache.updateRegion(region); + } + /** * Clears all cache when a TiKV server does not respond * @@ -222,7 +225,7 @@ public void invalidateRegion(TiRegion region) { public static class RegionCache { private final Map regionCache; - private final Map storeCache; + private final Map storeCache; private final RangeMap keyToRegionIdCache; private final ReadOnlyPDClient pdClient; @@ -312,6 +315,26 @@ public synchronized void invalidateRegion(TiRegion region) { } } + public synchronized boolean updateRegion(TiRegion region) { + try { + if (logger.isDebugEnabled()) { + logger.debug(String.format("invalidateRegion ID[%s]", region.getId())); + } + TiRegion oldRegion = regionCache.get(region.getId()); + if (oldRegion != null && !oldRegion.getRegionEpoch().equals(region.getRegionEpoch())) { + return false; + } else { + if (oldRegion != null) { + keyToRegionIdCache.remove(makeRange(oldRegion.getStartKey(), oldRegion.getEndKey())); + } + putRegion(region); + return true; + } + } catch (Exception ignore) { + return false; + } + } + public synchronized void invalidateAllRegionForStore(long storeId) { List regionToRemove = new ArrayList<>(); for (TiRegion r : regionCache.values()) { @@ -334,13 +357,13 @@ public synchronized void invalidateStore(long storeId) { storeCache.remove(storeId); } - public synchronized Store getStoreById(long id, BackOffer backOffer) { + public synchronized TiStore getStoreById(long id, BackOffer backOffer) { try { - Store store = storeCache.get(id); + TiStore store = storeCache.get(id); if (store == null) { - store = pdClient.getStore(backOffer, id); + store = new TiStore(pdClient.getStore(backOffer, id)); } - if (store.getState().equals(StoreState.Tombstone)) { + if (store.getStore().getState().equals(StoreState.Tombstone)) { return null; } storeCache.put(id, store); diff --git a/src/main/java/org/tikv/common/region/RegionStoreClient.java b/src/main/java/org/tikv/common/region/RegionStoreClient.java index 896f3060499..5fbc1e5a66f 100644 --- a/src/main/java/org/tikv/common/region/RegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/RegionStoreClient.java @@ -26,6 +26,8 @@ import com.pingcap.tidb.tipb.DAGRequest; import com.pingcap.tidb.tipb.SelectResponse; import io.grpc.ManagedChannel; +import io.grpc.Metadata; +import io.grpc.stub.MetadataUtils; import io.prometheus.client.Histogram; import java.util.*; import java.util.function.Supplier; @@ -44,7 +46,6 @@ import org.tikv.kvproto.Coprocessor; import org.tikv.kvproto.Errorpb; import org.tikv.kvproto.Kvrpcpb.*; -import org.tikv.kvproto.Metapb.Store; import org.tikv.kvproto.TikvGrpc; import org.tikv.kvproto.TikvGrpc.TikvBlockingStub; import org.tikv.kvproto.TikvGrpc.TikvStub; @@ -88,7 +89,7 @@ private synchronized Boolean getIsV4() { private RegionStoreClient( TiConfiguration conf, TiRegion region, - Store store, + TiStore store, TiStoreType storeType, ChannelFactory channelFactory, TikvBlockingStub blockingStub, @@ -113,10 +114,10 @@ private RegionStoreClient( clientBuilder); } else { - Store tikvStore = + TiStore tikvStore = regionManager.getRegionStorePairByKey(region.getStartKey(), TiStoreType.TiKV).second; - String addressStr = tikvStore.getAddress(); + String addressStr = tikvStore.getStore().getAddress(); if (logger.isDebugEnabled()) { logger.debug(String.format("Create region store client on address %s", addressStr)); } @@ -1245,20 +1246,34 @@ public RegionStoreClientBuilder( this.pdClient = pdClient; } - public RegionStoreClient build(TiRegion region, Store store, TiStoreType storeType) + public RegionStoreClient build(TiRegion region, TiStore store, TiStoreType storeType) throws GrpcException { Objects.requireNonNull(region, "region is null"); Objects.requireNonNull(store, "store is null"); Objects.requireNonNull(storeType, "storeType is null"); - String addressStr = store.getAddress(); + String addressStr = store.getStore().getAddress(); if (logger.isDebugEnabled()) { logger.debug(String.format("Create region store client on address %s", addressStr)); } - ManagedChannel channel = channelFactory.getChannel(addressStr, pdClient.getHostMapping()); - - TikvBlockingStub blockingStub = TikvGrpc.newBlockingStub(channel); - TikvStub asyncStub = TikvGrpc.newStub(channel); + ManagedChannel channel = null; + + TikvBlockingStub blockingStub = null; + TikvStub asyncStub = null; + + if (conf.getEnableGrpcForward() && region.getProxyStore() != null) { + addressStr = region.getProxyStore().getStore().getAddress(); + channel = + channelFactory.getChannel(addressStr, regionManager.getPDClient().getHostMapping()); + Metadata header = new Metadata(); + header.put(TiConfiguration.FORWARD_META_DATA_KEY, store.getStore().getAddress()); + blockingStub = MetadataUtils.attachHeaders(TikvGrpc.newBlockingStub(channel), header); + asyncStub = MetadataUtils.attachHeaders(TikvGrpc.newStub(channel), header); + } else { + channel = channelFactory.getChannel(addressStr, pdClient.getHostMapping()); + blockingStub = TikvGrpc.newBlockingStub(channel); + asyncStub = TikvGrpc.newStub(channel); + } return new RegionStoreClient( conf, @@ -1273,7 +1288,8 @@ public RegionStoreClient build(TiRegion region, Store store, TiStoreType storeTy this); } - public synchronized RegionStoreClient build(TiRegion region, Store store) throws GrpcException { + public synchronized RegionStoreClient build(TiRegion region, TiStore store) + throws GrpcException { return build(region, store, TiStoreType.TiKV); } @@ -1283,12 +1299,12 @@ public synchronized RegionStoreClient build(ByteString key) throws GrpcException public synchronized RegionStoreClient build(ByteString key, TiStoreType storeType) throws GrpcException { - Pair pair = regionManager.getRegionStorePairByKey(key, storeType); + Pair pair = regionManager.getRegionStorePairByKey(key, storeType); return build(pair.first, pair.second, storeType); } public synchronized RegionStoreClient build(TiRegion region) throws GrpcException { - Store store = regionManager.getStoreById(region.getLeader().getStoreId()); + TiStore store = regionManager.getStoreById(region.getLeader().getStoreId()); return build(region, store, TiStoreType.TiKV); } diff --git a/src/main/java/org/tikv/common/region/TiRegion.java b/src/main/java/org/tikv/common/region/TiRegion.java index beedd105c18..1e8a8c95323 100644 --- a/src/main/java/org/tikv/common/region/TiRegion.java +++ b/src/main/java/org/tikv/common/region/TiRegion.java @@ -50,13 +50,13 @@ public class TiRegion implements Serializable { private final Peer leader; private final ReplicaSelector replicaSelector; private final List replicaList; - private final Metapb.Store proxyStore; + private final TiStore proxyStore; private int replicaIdx; public TiRegion( Region meta, Peer leader, - Metapb.Store proxyStore, + TiStore proxyStore, IsolationLevel isolationLevel, Kvrpcpb.CommandPri commandPri, KVMode kvMode, @@ -200,6 +200,10 @@ public RegionVerID getVerID() { meta.getId(), meta.getRegionEpoch().getConfVer(), meta.getRegionEpoch().getVersion()); } + public TiStore getProxyStore() { + return proxyStore; + } + /** * switches current peer to the one on specific store. It return false if no peer matches the * storeID. @@ -224,6 +228,17 @@ public TiRegion switchPeer(long leaderStoreID) { return null; } + public TiRegion switchProxyStore(TiStore store) { + return new TiRegion( + this.meta, + this.leader, + store, + this.isolationLevel, + this.commandPri, + this.kvMode, + this.replicaSelector); + } + public boolean isMoreThan(ByteString key) { return FastByteComparisons.compareTo( meta.getStartKey().toByteArray(), diff --git a/src/main/java/org/tikv/common/region/TiStore.java b/src/main/java/org/tikv/common/region/TiStore.java new file mode 100644 index 00000000000..7877da3084d --- /dev/null +++ b/src/main/java/org/tikv/common/region/TiStore.java @@ -0,0 +1,26 @@ +package org.tikv.common.region; + +import java.util.concurrent.atomic.AtomicBoolean; +import org.tikv.kvproto.Metapb; + +public class TiStore { + private final Metapb.Store store; + private AtomicBoolean unreachable; + + public TiStore(Metapb.Store store) { + this.store = store; + this.unreachable.set(false); + } + + public void invalid() { + this.unreachable.set(true); + } + + public boolean isUnreachable() { + return this.unreachable.get(); + } + + public Metapb.Store getStore() { + return this.store; + } +} diff --git a/src/main/java/org/tikv/common/util/RangeSplitter.java b/src/main/java/org/tikv/common/util/RangeSplitter.java index 4c5187a12a9..475e799f71c 100644 --- a/src/main/java/org/tikv/common/util/RangeSplitter.java +++ b/src/main/java/org/tikv/common/util/RangeSplitter.java @@ -29,9 +29,9 @@ import org.tikv.common.pd.PDUtils; import org.tikv.common.region.RegionManager; import org.tikv.common.region.TiRegion; +import org.tikv.common.region.TiStore; import org.tikv.common.region.TiStoreType; import org.tikv.kvproto.Coprocessor.KeyRange; -import org.tikv.kvproto.Metapb; public class RangeSplitter { private final RegionManager regionManager; @@ -51,12 +51,11 @@ public static RangeSplitter newSplitter(RegionManager mgr) { * @param handles Handle list * @return map */ - public Map, TLongArrayList> groupByAndSortHandlesByRegionId( + public Map, TLongArrayList> groupByAndSortHandlesByRegionId( long tableId, TLongArrayList handles) { TLongObjectHashMap regionHandles = new TLongObjectHashMap<>(); - TLongObjectHashMap> idToRegionStorePair = - new TLongObjectHashMap<>(); - Map, TLongArrayList> result = new HashMap<>(); + TLongObjectHashMap> idToRegionStorePair = new TLongObjectHashMap<>(); + Map, TLongArrayList> result = new HashMap<>(); handles.sort(); byte[] endKey = null; @@ -71,7 +70,7 @@ public Map, TLongArrayList> groupByAndSortHandlesBy regionHandles.put(curRegion.getId(), handlesInCurRegion); handlesInCurRegion = new TLongArrayList(); } - Pair regionStorePair = + Pair regionStorePair = regionManager.getRegionStorePairByKey(ByteString.copyFrom(key.getBytes())); curRegion = regionStorePair.first; idToRegionStorePair.put(curRegion.getId(), regionStorePair); @@ -84,7 +83,7 @@ public Map, TLongArrayList> groupByAndSortHandlesBy } regionHandles.forEachEntry( (k, v) -> { - Pair regionStorePair = idToRegionStorePair.get(k); + Pair regionStorePair = idToRegionStorePair.get(k); result.put(regionStorePair, v); return true; }); @@ -110,7 +109,7 @@ private List splitAndSortHandlesByRegion(long tableId, TLongArrayLis // Max value for current index handle range ImmutableList.Builder regionTasks = ImmutableList.builder(); - Map, TLongArrayList> regionHandlesMap = + Map, TLongArrayList> regionHandlesMap = groupByAndSortHandlesByRegionId(tableId, handles); regionHandlesMap.forEach((k, v) -> createTask(0, v.size(), tableId, v, k, regionTasks)); @@ -123,7 +122,7 @@ private void createTask( int endPos, long tableId, TLongArrayList handles, - Pair regionStorePair, + Pair regionStorePair, ImmutableList.Builder regionTasks) { List newKeyRanges = new ArrayList<>(endPos - startPos + 1); long startHandle = handles.get(startPos); @@ -163,10 +162,10 @@ public List splitRangeByRegion(List keyRanges, TiStoreType int i = 0; KeyRange range = keyRanges.get(i++); Map> idToRange = new HashMap<>(); // region id to keyRange list - Map> idToRegion = new HashMap<>(); + Map> idToRegion = new HashMap<>(); while (true) { - Pair regionStorePair = + Pair regionStorePair = regionManager.getRegionStorePairByKey(range.getStart(), storeType); if (regionStorePair == null) { @@ -203,7 +202,7 @@ public List splitRangeByRegion(List keyRanges, TiStoreType ImmutableList.Builder resultBuilder = ImmutableList.builder(); idToRange.forEach( (k, v) -> { - Pair regionStorePair = idToRegion.get(k); + Pair regionStorePair = idToRegion.get(k); resultBuilder.add(new RegionTask(regionStorePair.first, regionStorePair.second, v)); }); return resultBuilder.build(); @@ -221,24 +220,23 @@ public List splitRangeByRegion(List keyRanges) { public static class RegionTask implements Serializable { private final TiRegion region; - private final Metapb.Store store; + private final TiStore store; private final List ranges; private final String host; - RegionTask(TiRegion region, Metapb.Store store, List ranges) { + RegionTask(TiRegion region, TiStore store, List ranges) { this.region = region; this.store = store; this.ranges = ranges; String host = null; try { - host = PDUtils.addrToUri(store.getAddress()).getHost(); + host = PDUtils.addrToUri(store.getStore().getAddress()).getHost(); } catch (Exception ignored) { } this.host = host; } - public static RegionTask newInstance( - TiRegion region, Metapb.Store store, List ranges) { + public static RegionTask newInstance(TiRegion region, TiStore store, List ranges) { return new RegionTask(region, store, ranges); } @@ -246,7 +244,7 @@ public TiRegion getRegion() { return region; } - public Metapb.Store getStore() { + public TiStore getStore() { return store; } diff --git a/src/main/java/org/tikv/txn/AbstractLockResolverClient.java b/src/main/java/org/tikv/txn/AbstractLockResolverClient.java index d9006de34ea..020068e272a 100644 --- a/src/main/java/org/tikv/txn/AbstractLockResolverClient.java +++ b/src/main/java/org/tikv/txn/AbstractLockResolverClient.java @@ -26,10 +26,10 @@ import org.tikv.common.region.RegionManager; import org.tikv.common.region.RegionStoreClient; import org.tikv.common.region.TiRegion; +import org.tikv.common.region.TiStore; import org.tikv.common.util.BackOffer; import org.tikv.common.util.ChannelFactory; import org.tikv.kvproto.Kvrpcpb; -import org.tikv.kvproto.Metapb; import org.tikv.kvproto.TikvGrpc; public interface AbstractLockResolverClient { @@ -69,20 +69,21 @@ static Lock extractLockFromKeyErr(Kvrpcpb.KeyError keyError) { static AbstractLockResolverClient getInstance( TiConfiguration conf, TiRegion region, - Metapb.Store store, + TiStore store, TikvGrpc.TikvBlockingStub blockingStub, TikvGrpc.TikvStub asyncStub, ChannelFactory channelFactory, RegionManager regionManager, PDClient pdClient, RegionStoreClient.RegionStoreClientBuilder clientBuilder) { - if (StoreVersion.compareTo(store.getVersion(), Version.RESOLVE_LOCK_V3) < 0) { + if (StoreVersion.compareTo(store.getStore().getVersion(), Version.RESOLVE_LOCK_V3) < 0) { return new LockResolverClientV2( conf, region, store, blockingStub, asyncStub, channelFactory, regionManager); - } else if (StoreVersion.compareTo(store.getVersion(), Version.RESOLVE_LOCK_V4) < 0) { + } else if (StoreVersion.compareTo(store.getStore().getVersion(), Version.RESOLVE_LOCK_V4) < 0) { return new LockResolverClientV3( conf, region, + store, blockingStub, asyncStub, channelFactory, @@ -93,6 +94,7 @@ static AbstractLockResolverClient getInstance( return new LockResolverClientV4( conf, region, + store, blockingStub, asyncStub, channelFactory, diff --git a/src/main/java/org/tikv/txn/LockResolverClientV2.java b/src/main/java/org/tikv/txn/LockResolverClientV2.java index 718ddec50f5..3df5966abda 100644 --- a/src/main/java/org/tikv/txn/LockResolverClientV2.java +++ b/src/main/java/org/tikv/txn/LockResolverClientV2.java @@ -42,6 +42,7 @@ import org.tikv.common.region.RegionManager; import org.tikv.common.region.TiRegion; import org.tikv.common.region.TiRegion.RegionVerID; +import org.tikv.common.region.TiStore; import org.tikv.common.util.BackOffer; import org.tikv.common.util.ChannelFactory; import org.tikv.common.util.TsoUtils; @@ -49,7 +50,6 @@ import org.tikv.kvproto.Kvrpcpb.CleanupResponse; import org.tikv.kvproto.Kvrpcpb.ResolveLockRequest; import org.tikv.kvproto.Kvrpcpb.ResolveLockResponse; -import org.tikv.kvproto.Metapb; import org.tikv.kvproto.TikvGrpc; import org.tikv.kvproto.TikvGrpc.TikvBlockingStub; import org.tikv.kvproto.TikvGrpc.TikvStub; @@ -75,7 +75,7 @@ public class LockResolverClientV2 extends AbstractRegionStoreClient public LockResolverClientV2( TiConfiguration conf, TiRegion region, - Metapb.Store store, + TiStore store, TikvBlockingStub blockingStub, TikvStub asyncStub, ChannelFactory channelFactory, diff --git a/src/main/java/org/tikv/txn/LockResolverClientV3.java b/src/main/java/org/tikv/txn/LockResolverClientV3.java index 4ec1f676e8b..0b8d3c89a8c 100644 --- a/src/main/java/org/tikv/txn/LockResolverClientV3.java +++ b/src/main/java/org/tikv/txn/LockResolverClientV3.java @@ -39,10 +39,7 @@ import org.tikv.common.exception.RegionException; import org.tikv.common.exception.TiClientInternalException; import org.tikv.common.operation.KVErrorHandler; -import org.tikv.common.region.AbstractRegionStoreClient; -import org.tikv.common.region.RegionManager; -import org.tikv.common.region.RegionStoreClient; -import org.tikv.common.region.TiRegion; +import org.tikv.common.region.*; import org.tikv.common.region.TiRegion.RegionVerID; import org.tikv.common.util.BackOffer; import org.tikv.common.util.ChannelFactory; @@ -79,13 +76,14 @@ public class LockResolverClientV3 extends AbstractRegionStoreClient public LockResolverClientV3( TiConfiguration conf, TiRegion region, + TiStore store, TikvBlockingStub blockingStub, TikvStub asyncStub, ChannelFactory channelFactory, RegionManager regionManager, PDClient pdClient, RegionStoreClient.RegionStoreClientBuilder clientBuilder) { - super(conf, region, channelFactory, blockingStub, asyncStub, regionManager); + super(conf, region, store, channelFactory, blockingStub, asyncStub, regionManager); resolved = new HashMap<>(); recentResolved = new LinkedList<>(); readWriteLock = new ReentrantReadWriteLock(); diff --git a/src/main/java/org/tikv/txn/LockResolverClientV4.java b/src/main/java/org/tikv/txn/LockResolverClientV4.java index 9b23733553c..07a5552f0f7 100644 --- a/src/main/java/org/tikv/txn/LockResolverClientV4.java +++ b/src/main/java/org/tikv/txn/LockResolverClientV4.java @@ -39,10 +39,7 @@ import org.tikv.common.exception.RegionException; import org.tikv.common.exception.TiClientInternalException; import org.tikv.common.operation.KVErrorHandler; -import org.tikv.common.region.AbstractRegionStoreClient; -import org.tikv.common.region.RegionManager; -import org.tikv.common.region.RegionStoreClient; -import org.tikv.common.region.TiRegion; +import org.tikv.common.region.*; import org.tikv.common.region.TiRegion.RegionVerID; import org.tikv.common.util.BackOffer; import org.tikv.common.util.ChannelFactory; @@ -79,13 +76,14 @@ public class LockResolverClientV4 extends AbstractRegionStoreClient public LockResolverClientV4( TiConfiguration conf, TiRegion region, + TiStore store, TikvBlockingStub blockingStub, TikvStub asyncStub, ChannelFactory channelFactory, RegionManager regionManager, PDClient pdClient, RegionStoreClient.RegionStoreClientBuilder clientBuilder) { - super(conf, region, channelFactory, blockingStub, asyncStub, regionManager); + super(conf, region, store, channelFactory, blockingStub, asyncStub, regionManager); resolved = new HashMap<>(); recentResolved = new LinkedList<>(); readWriteLock = new ReentrantReadWriteLock(); diff --git a/src/main/java/org/tikv/txn/TTLManager.java b/src/main/java/org/tikv/txn/TTLManager.java index 8424ff52f96..82f64fafe5f 100644 --- a/src/main/java/org/tikv/txn/TTLManager.java +++ b/src/main/java/org/tikv/txn/TTLManager.java @@ -30,11 +30,11 @@ import org.tikv.common.meta.TiTimestamp; import org.tikv.common.region.RegionManager; import org.tikv.common.region.TiRegion; +import org.tikv.common.region.TiStore; import org.tikv.common.util.BackOffFunction; import org.tikv.common.util.BackOffer; import org.tikv.common.util.ConcreteBackOffer; import org.tikv.common.util.Pair; -import org.tikv.kvproto.Metapb; import org.tikv.txn.type.ClientRPCResult; /** @@ -105,9 +105,9 @@ private void doKeepAlive() { } private void sendTxnHeartBeat(BackOffer bo, long ttl) { - Pair pair = regionManager.getRegionStorePairByKey(primaryLock); + Pair pair = regionManager.getRegionStorePairByKey(primaryLock); TiRegion tiRegion = pair.first; - Metapb.Store store = pair.second; + TiStore store = pair.second; ClientRPCResult result = kvClient.txnHeartBeat(bo, primaryLock, startTS, ttl, tiRegion, store); @@ -121,7 +121,7 @@ private void sendTxnHeartBeat(BackOffer bo, long ttl) { new GrpcException( String.format("sendTxnHeartBeat failed, regionId=%s", tiRegion.getId()), result.getException())); - this.regionManager.invalidateStore(store.getId()); + this.regionManager.invalidateStore(store.getStore().getId()); this.regionManager.invalidateRegion(tiRegion); // re-split keys and commit again. sendTxnHeartBeat(bo, ttl); diff --git a/src/main/java/org/tikv/txn/TwoPhaseCommitter.java b/src/main/java/org/tikv/txn/TwoPhaseCommitter.java index 8d7e8af42ad..ce2568261de 100644 --- a/src/main/java/org/tikv/txn/TwoPhaseCommitter.java +++ b/src/main/java/org/tikv/txn/TwoPhaseCommitter.java @@ -38,13 +38,13 @@ import org.tikv.common.exception.TiBatchWriteException; import org.tikv.common.region.RegionManager; import org.tikv.common.region.TiRegion; +import org.tikv.common.region.TiStore; import org.tikv.common.util.BackOffFunction; import org.tikv.common.util.BackOffer; import org.tikv.common.util.ConcreteBackOffer; import org.tikv.common.util.Pair; import org.tikv.kvproto.Kvrpcpb; import org.tikv.kvproto.Kvrpcpb.Op; -import org.tikv.kvproto.Metapb; import org.tikv.txn.type.BatchKeys; import org.tikv.txn.type.ClientRPCResult; import org.tikv.txn.type.GroupKeyResult; @@ -150,9 +150,9 @@ public void prewritePrimaryKey(BackOffer backOffer, byte[] primaryKey, byte[] va private void doPrewritePrimaryKeyWithRetry(BackOffer backOffer, ByteString key, ByteString value) throws TiBatchWriteException { - Pair pair = this.regionManager.getRegionStorePairByKey(key, backOffer); + Pair pair = this.regionManager.getRegionStorePairByKey(key, backOffer); TiRegion tiRegion = pair.first; - Metapb.Store store = pair.second; + TiStore store = pair.second; Kvrpcpb.Mutation mutation; if (!value.isEmpty()) { @@ -205,9 +205,9 @@ public void commitPrimaryKey(BackOffer backOffer, byte[] key, long commitTs) private void doCommitPrimaryKeyWithRetry(BackOffer backOffer, ByteString key, long commitTs) throws TiBatchWriteException { - Pair pair = this.regionManager.getRegionStorePairByKey(key, backOffer); + Pair pair = this.regionManager.getRegionStorePairByKey(key, backOffer); TiRegion tiRegion = pair.first; - Metapb.Store store = pair.second; + TiStore store = pair.second; ByteString[] keys = new ByteString[] {key}; // send rpc request to tikv server @@ -339,11 +339,11 @@ private void doPrewriteSecondaryKeysInBatchesWithRetry( // groups keys by region GroupKeyResult groupResult = this.groupKeysByRegion(keys, size, backOffer); List batchKeyList = new LinkedList<>(); - Map, List> groupKeyMap = groupResult.getGroupsResult(); + Map, List> groupKeyMap = groupResult.getGroupsResult(); - for (Map.Entry, List> entry : groupKeyMap.entrySet()) { + for (Map.Entry, List> entry : groupKeyMap.entrySet()) { TiRegion tiRegion = entry.getKey().first; - Metapb.Store store = entry.getKey().second; + TiStore store = entry.getKey().second; this.appendBatchBySize(batchKeyList, tiRegion, store, entry.getValue(), true, mutations); } @@ -454,7 +454,7 @@ private void doPrewriteSecondaryKeySingleBatchWithRetry( private void appendBatchBySize( List batchKeyList, TiRegion tiRegion, - Metapb.Store store, + TiStore store, List keys, boolean sizeIncludeValue, Map mutations) { @@ -575,11 +575,11 @@ private void doCommitSecondaryKeysWithRetry( // groups keys by region GroupKeyResult groupResult = this.groupKeysByRegion(keys, size, backOffer); List batchKeyList = new ArrayList<>(); - Map, List> groupKeyMap = groupResult.getGroupsResult(); + Map, List> groupKeyMap = groupResult.getGroupsResult(); - for (Map.Entry, List> entry : groupKeyMap.entrySet()) { + for (Map.Entry, List> entry : groupKeyMap.entrySet()) { TiRegion tiRegion = entry.getKey().first; - Metapb.Store store = entry.getKey().second; + TiStore store = entry.getKey().second; this.appendBatchBySize(batchKeyList, tiRegion, store, entry.getValue(), false, null); } @@ -619,13 +619,12 @@ private void doCommitSecondaryKeySingleBatchWithRetry( private GroupKeyResult groupKeysByRegion(ByteString[] keys, int size, BackOffer backOffer) throws TiBatchWriteException { - Map, List> groups = new HashMap<>(); + Map, List> groups = new HashMap<>(); int index = 0; try { for (; index < size; index++) { ByteString key = keys[index]; - Pair pair = - this.regionManager.getRegionStorePairByKey(key, backOffer); + Pair pair = this.regionManager.getRegionStorePairByKey(key, backOffer); if (pair != null) { groups.computeIfAbsent(pair, e -> new ArrayList<>()).add(key); } diff --git a/src/main/java/org/tikv/txn/TxnKVClient.java b/src/main/java/org/tikv/txn/TxnKVClient.java index e03a1a1a51d..e16876f2373 100644 --- a/src/main/java/org/tikv/txn/TxnKVClient.java +++ b/src/main/java/org/tikv/txn/TxnKVClient.java @@ -33,11 +33,11 @@ import org.tikv.common.region.RegionManager; import org.tikv.common.region.RegionStoreClient; import org.tikv.common.region.TiRegion; +import org.tikv.common.region.TiStore; import org.tikv.common.util.BackOffFunction; import org.tikv.common.util.BackOffer; import org.tikv.common.util.ConcreteBackOffer; import org.tikv.kvproto.Kvrpcpb; -import org.tikv.kvproto.Metapb; import org.tikv.txn.type.ClientRPCResult; /** KV client of transaction APIs for GET/PUT/DELETE/SCAN */ @@ -94,7 +94,7 @@ public ClientRPCResult prewrite( long lockTTL, long startTs, TiRegion tiRegion, - Metapb.Store store) { + TiStore store) { ClientRPCResult result = new ClientRPCResult(true, false, null); // send request RegionStoreClient client = clientBuilder.build(tiRegion, store); @@ -116,7 +116,7 @@ public ClientRPCResult txnHeartBeat( long startTs, long ttl, TiRegion tiRegion, - Metapb.Store store) { + TiStore store) { ClientRPCResult result = new ClientRPCResult(true, false, null); // send request RegionStoreClient client = clientBuilder.build(tiRegion, store); @@ -148,7 +148,7 @@ public ClientRPCResult commit( long startTs, long commitTs, TiRegion tiRegion, - Metapb.Store store) { + TiStore store) { ClientRPCResult result = new ClientRPCResult(true, false, null); // send request RegionStoreClient client = clientBuilder.build(tiRegion, store); diff --git a/src/main/java/org/tikv/txn/type/BatchKeys.java b/src/main/java/org/tikv/txn/type/BatchKeys.java index 57657d9958a..7b61c948f0c 100644 --- a/src/main/java/org/tikv/txn/type/BatchKeys.java +++ b/src/main/java/org/tikv/txn/type/BatchKeys.java @@ -19,16 +19,15 @@ import java.util.ArrayList; import java.util.List; import org.tikv.common.region.TiRegion; -import org.tikv.kvproto.Metapb; +import org.tikv.common.region.TiStore; public class BatchKeys { private final TiRegion region; - private final Metapb.Store store; + private final TiStore store; private List keys; private final int sizeInBytes; - public BatchKeys( - TiRegion region, Metapb.Store store, List keysInput, int sizeInBytes) { + public BatchKeys(TiRegion region, TiStore store, List keysInput, int sizeInBytes) { this.region = region; this.store = store; this.keys = new ArrayList<>(); @@ -48,7 +47,7 @@ public TiRegion getRegion() { return region; } - public Metapb.Store getStore() { + public TiStore getStore() { return store; } diff --git a/src/main/java/org/tikv/txn/type/GroupKeyResult.java b/src/main/java/org/tikv/txn/type/GroupKeyResult.java index 334dac35daf..837385337a1 100644 --- a/src/main/java/org/tikv/txn/type/GroupKeyResult.java +++ b/src/main/java/org/tikv/txn/type/GroupKeyResult.java @@ -20,22 +20,22 @@ import java.util.List; import java.util.Map; import org.tikv.common.region.TiRegion; +import org.tikv.common.region.TiStore; import org.tikv.common.util.Pair; -import org.tikv.kvproto.Metapb; public class GroupKeyResult { - private Map, List> groupsResult; + private Map, List> groupsResult; public GroupKeyResult() { this.groupsResult = new HashMap<>(); } - public Map, List> getGroupsResult() { + public Map, List> getGroupsResult() { return groupsResult; } - public void setGroupsResult(Map, List> groupsResult) { + public void setGroupsResult(Map, List> groupsResult) { this.groupsResult = groupsResult; } } diff --git a/src/test/java/org/tikv/common/MockServerTest.java b/src/test/java/org/tikv/common/MockServerTest.java index bfce6db50cd..a69e10ba383 100644 --- a/src/test/java/org/tikv/common/MockServerTest.java +++ b/src/test/java/org/tikv/common/MockServerTest.java @@ -32,6 +32,7 @@ public void setUp() throws IOException { new TiRegion( r, r.getPeers(0), + null, session.getConf().getIsolationLevel(), session.getConf().getCommandPriority(), KVMode.TXN, From 0237ac5bd6e24eb24521f1cabe86d2ab11a6cdca Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Wed, 16 Jun 2021 20:26:57 +0800 Subject: [PATCH 04/10] check health periodly Signed-off-by: Little-Wallace --- pom.xml | 2 +- src/main/java/org/tikv/common/TiSession.java | 5 +- .../region/AbstractRegionStoreClient.java | 34 ++++++-- .../common/region/RegionErrorReceiver.java | 1 + .../org/tikv/common/region/RegionManager.java | 30 ++++++-- .../tikv/common/region/RegionStoreClient.java | 9 ++- .../java/org/tikv/common/region/TiStore.java | 12 ++- .../region/UnreachableStoreChecker.java | 77 +++++++++++++++++++ .../org/tikv/common/RegionManagerTest.java | 8 +- .../tikv/common/RegionStoreClientTest.java | 4 +- 10 files changed, 161 insertions(+), 21 deletions(-) create mode 100644 src/main/java/org/tikv/common/region/UnreachableStoreChecker.java diff --git a/pom.xml b/pom.xml index ed6ed478ae9..c67ed6b7585 100644 --- a/pom.xml +++ b/pom.xml @@ -128,7 +128,7 @@ io.grpc - grpc-all + grpc-services ${grpc.version} diff --git a/src/main/java/org/tikv/common/TiSession.java b/src/main/java/org/tikv/common/TiSession.java index ff0502e46c3..82f833b6fe9 100644 --- a/src/main/java/org/tikv/common/TiSession.java +++ b/src/main/java/org/tikv/common/TiSession.java @@ -203,7 +203,10 @@ public synchronized RegionManager getRegionManager() { if (regionManager == null) { regionManager = new RegionManager( - getPDClient(), this.cacheInvalidateCallback, this.enableGrpcForward); + getPDClient(), + this.cacheInvalidateCallback, + this.channelFactory, + this.enableGrpcForward); } res = regionManager; } diff --git a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java index 72cd9b3a872..6093f6aa606 100644 --- a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java @@ -22,6 +22,9 @@ import io.grpc.ManagedChannel; import io.grpc.Metadata; +import io.grpc.health.v1.HealthCheckRequest; +import io.grpc.health.v1.HealthCheckResponse; +import io.grpc.health.v1.HealthGrpc; import io.grpc.stub.MetadataUtils; import java.util.List; import java.util.concurrent.TimeUnit; @@ -77,7 +80,7 @@ public void close() throws GrpcException {} /** * onNotLeader deals with NotLeaderError and returns whether re-splitting key range is needed * - * @param newStore the new store presented by NotLeader Error + * @param newRegion the new region presented by NotLeader Error * @return false when re-split is needed. */ @Override @@ -110,15 +113,21 @@ public boolean onStoreUnreachable() { if (checkHealth(store)) { return true; } else { - store.invalid(); + if (store.invalid()) { + this.regionManager.scheduleHealthCheckJob(store); + } region = region.switchProxyStore(null); regionManager.updateRegion(region); } } else { - if (checkHealth(targetStore)) { - return true; - } else { - targetStore.invalid(); + if (!targetStore.isUnreachable()) { + if (checkHealth(targetStore)) { + return true; + } else { + if (targetStore.invalid()) { + this.regionManager.scheduleHealthCheckJob(targetStore); + } + } } } TiRegion proxyRegion = switchProxyStore(); @@ -138,6 +147,19 @@ public boolean onStoreUnreachable() { } private boolean checkHealth(TiStore store) { + String addressStr = store.getStore().getAddress(); + ManagedChannel channel = + channelFactory.getChannel(addressStr, regionManager.getPDClient().getHostMapping()); + HealthGrpc.HealthBlockingStub stub = HealthGrpc.newBlockingStub(channel); + HealthCheckRequest req = HealthCheckRequest.newBuilder().build(); + try { + HealthCheckResponse resp = stub.check(req); + if (resp.getStatus() != HealthCheckResponse.ServingStatus.SERVING) { + return false; + } + } catch (Exception e) { + return false; + } return true; } diff --git a/src/main/java/org/tikv/common/region/RegionErrorReceiver.java b/src/main/java/org/tikv/common/region/RegionErrorReceiver.java index 3f4af86c818..4bee1356eab 100644 --- a/src/main/java/org/tikv/common/region/RegionErrorReceiver.java +++ b/src/main/java/org/tikv/common/region/RegionErrorReceiver.java @@ -20,6 +20,7 @@ public interface RegionErrorReceiver { boolean onNotLeader(TiRegion region); + /// return whether we need to retry this request. boolean onStoreUnreachable(); TiRegion getRegion(); diff --git a/src/main/java/org/tikv/common/region/RegionManager.java b/src/main/java/org/tikv/common/region/RegionManager.java index 38645717229..075ee04bf60 100644 --- a/src/main/java/org/tikv/common/region/RegionManager.java +++ b/src/main/java/org/tikv/common/region/RegionManager.java @@ -28,6 +28,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.function.Function; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,6 +40,7 @@ import org.tikv.common.exception.TiClientInternalException; import org.tikv.common.key.Key; import org.tikv.common.util.BackOffer; +import org.tikv.common.util.ChannelFactory; import org.tikv.common.util.ConcreteBackOffer; import org.tikv.common.util.Pair; import org.tikv.kvproto.Metapb; @@ -49,7 +53,8 @@ public class RegionManager { // TODO: the region cache logic need rewrite. // https://github.com/pingcap/tispark/issues/1170 private final RegionCache cache; - private final boolean enableGrpcForward; + private final ScheduledExecutorService executor; + private final UnreachableStoreChecker storeChecker; private final Function cacheInvalidateCallback; @@ -65,22 +70,33 @@ public RegionManager( ReadOnlyPDClient pdClient, Function cacheInvalidateCallback) { this.cache = new RegionCache(pdClient); this.cacheInvalidateCallback = cacheInvalidateCallback; - this.enableGrpcForward = false; + this.executor = null; + this.storeChecker = null; } public RegionManager( ReadOnlyPDClient pdClient, Function cacheInvalidateCallback, + ChannelFactory channelFactory, boolean enableGrpcForward) { this.cache = new RegionCache(pdClient); this.cacheInvalidateCallback = cacheInvalidateCallback; - this.enableGrpcForward = enableGrpcForward; + if (enableGrpcForward) { + UnreachableStoreChecker storeChecker = new UnreachableStoreChecker(channelFactory, pdClient); + this.storeChecker = storeChecker; + this.executor = Executors.newScheduledThreadPool(1); + this.executor.schedule(storeChecker, 2, TimeUnit.SECONDS); + } else { + this.storeChecker = null; + this.executor = null; + } } public RegionManager(ReadOnlyPDClient pdClient) { this.cache = new RegionCache(pdClient); this.cacheInvalidateCallback = null; - this.enableGrpcForward = false; + this.storeChecker = null; + this.executor = null; } public Function getCacheInvalidateCallback() { @@ -209,7 +225,7 @@ public void onRequestFail(TiRegion region) { } private void onRequestFail(TiRegion region, long storeId) { - if (this.enableGrpcForward) { + if (this.storeChecker != null) { cache.invalidateRegion(region); cache.invalidateAllRegionForStore(storeId); } @@ -223,6 +239,10 @@ public void invalidateRegion(TiRegion region) { cache.invalidateRegion(region); } + public void scheduleHealthCheckJob(TiStore store) { + this.storeChecker.scheduleStoreHealthCheck(store); + } + public static class RegionCache { private final Map regionCache; private final Map storeCache; diff --git a/src/main/java/org/tikv/common/region/RegionStoreClient.java b/src/main/java/org/tikv/common/region/RegionStoreClient.java index 5fbc1e5a66f..f9f31f25840 100644 --- a/src/main/java/org/tikv/common/region/RegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/RegionStoreClient.java @@ -1261,7 +1261,7 @@ public RegionStoreClient build(TiRegion region, TiStore store, TiStoreType store TikvBlockingStub blockingStub = null; TikvStub asyncStub = null; - if (conf.getEnableGrpcForward() && region.getProxyStore() != null) { + if (conf.getEnableGrpcForward() && region.getProxyStore() != null && store.isUnreachable()) { addressStr = region.getProxyStore().getStore().getAddress(); channel = channelFactory.getChannel(addressStr, regionManager.getPDClient().getHostMapping()); @@ -1270,6 +1270,13 @@ public RegionStoreClient build(TiRegion region, TiStore store, TiStoreType store blockingStub = MetadataUtils.attachHeaders(TikvGrpc.newBlockingStub(channel), header); asyncStub = MetadataUtils.attachHeaders(TikvGrpc.newStub(channel), header); } else { + // If the store is reachable, which is update by check-health thread + if (!store.isUnreachable()) { + TiRegion newRegion = region.switchProxyStore(null); + if (regionManager.updateRegion(newRegion)) { + region = newRegion; + } + } channel = channelFactory.getChannel(addressStr, pdClient.getHostMapping()); blockingStub = TikvGrpc.newBlockingStub(channel); asyncStub = TikvGrpc.newStub(channel); diff --git a/src/main/java/org/tikv/common/region/TiStore.java b/src/main/java/org/tikv/common/region/TiStore.java index 7877da3084d..0527c66bb47 100644 --- a/src/main/java/org/tikv/common/region/TiStore.java +++ b/src/main/java/org/tikv/common/region/TiStore.java @@ -12,8 +12,12 @@ public TiStore(Metapb.Store store) { this.unreachable.set(false); } - public void invalid() { - this.unreachable.set(true); + public boolean invalid() { + return this.unreachable.compareAndSet(false, true); + } + + public void markReachable() { + this.unreachable.set(false); } public boolean isUnreachable() { @@ -23,4 +27,8 @@ public boolean isUnreachable() { public Metapb.Store getStore() { return this.store; } + + public long getId() { + return this.store.getId(); + } } diff --git a/src/main/java/org/tikv/common/region/UnreachableStoreChecker.java b/src/main/java/org/tikv/common/region/UnreachableStoreChecker.java new file mode 100644 index 00000000000..4cd809473e6 --- /dev/null +++ b/src/main/java/org/tikv/common/region/UnreachableStoreChecker.java @@ -0,0 +1,77 @@ +package org.tikv.common.region; + +import io.grpc.ManagedChannel; +import io.grpc.health.v1.HealthCheckRequest; +import io.grpc.health.v1.HealthCheckResponse; +import io.grpc.health.v1.HealthGrpc; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import org.tikv.common.ReadOnlyPDClient; +import org.tikv.common.util.ChannelFactory; + +public class UnreachableStoreChecker implements Callable { + private ConcurrentHashMap stores; + private List taskQueue; + private final ChannelFactory channelFactory; + private final ReadOnlyPDClient pdClient; + + public UnreachableStoreChecker(ChannelFactory channelFactory, ReadOnlyPDClient pdClient) { + this.stores = new ConcurrentHashMap(); + this.taskQueue = new LinkedList<>(); + this.channelFactory = channelFactory; + this.pdClient = pdClient; + } + + public void scheduleStoreHealthCheck(TiStore store) { + TiStore oldStore = this.stores.get(Long.valueOf(store.getId())); + if (oldStore != null) { + return; + } + synchronized (this.taskQueue) { + this.stores.put(Long.valueOf(store.getId()), store); + this.taskQueue.add(store); + } + } + + private List getUnhealthStore() { + synchronized (this.taskQueue) { + List unhealthStore = new LinkedList<>(); + unhealthStore.addAll(this.taskQueue); + return unhealthStore; + } + } + + @Override + public Object call() throws Exception { + List unhealthStore = getUnhealthStore(); + List restStore = new LinkedList<>(); + for (TiStore store : unhealthStore) { + String addressStr = store.getStore().getAddress(); + ManagedChannel channel = channelFactory.getChannel(addressStr, pdClient.getHostMapping()); + HealthGrpc.HealthBlockingStub stub = HealthGrpc.newBlockingStub(channel); + HealthCheckRequest req = HealthCheckRequest.newBuilder().build(); + try { + HealthCheckResponse resp = stub.check(req); + if (resp.getStatus() == HealthCheckResponse.ServingStatus.SERVING) { + store.markReachable(); + this.stores.remove(Long.valueOf(store.getId())); + continue; + } + } finally { + } + restStore.add(store); + } + synchronized (this.taskQueue) { + int idx = unhealthStore.size(); + if (idx < this.taskQueue.size()) { + for (int i = idx; i < this.taskQueue.size(); i++) { + restStore.add(this.taskQueue.get(i)); + } + } + this.taskQueue = restStore; + } + return null; + } +} diff --git a/src/test/java/org/tikv/common/RegionManagerTest.java b/src/test/java/org/tikv/common/RegionManagerTest.java index d382e21b520..112d5cb0ec6 100644 --- a/src/test/java/org/tikv/common/RegionManagerTest.java +++ b/src/test/java/org/tikv/common/RegionManagerTest.java @@ -26,10 +26,10 @@ import org.tikv.common.key.Key; import org.tikv.common.region.RegionManager; import org.tikv.common.region.TiRegion; +import org.tikv.common.region.TiStore; import org.tikv.common.util.KeyRangeUtils; import org.tikv.common.util.Pair; import org.tikv.kvproto.Metapb; -import org.tikv.kvproto.Metapb.Store; import org.tikv.kvproto.Metapb.StoreState; public class RegionManagerTest extends PDMockServerTest { @@ -115,7 +115,7 @@ public void getStoreByKey() throws Exception { Metapb.StoreState.Up, GrpcUtils.makeStoreLabel("k1", "v1"), GrpcUtils.makeStoreLabel("k2", "v2")))); - Pair pair = mgr.getRegionStorePairByKey(searchKey); + Pair pair = mgr.getRegionStorePairByKey(searchKey); assertEquals(pair.first.getId(), regionId); assertEquals(pair.first.getId(), storeId); } @@ -133,8 +133,8 @@ public void getStoreById() throws Exception { Metapb.StoreState.Up, GrpcUtils.makeStoreLabel("k1", "v1"), GrpcUtils.makeStoreLabel("k2", "v2")))); - Store store = mgr.getStoreById(storeId); - assertEquals(store.getId(), storeId); + TiStore store = mgr.getStoreById(storeId); + assertEquals(store.getStore().getId(), storeId); pdServer.addGetStoreResp( GrpcUtils.makeGetStoreResponse( diff --git a/src/test/java/org/tikv/common/RegionStoreClientTest.java b/src/test/java/org/tikv/common/RegionStoreClientTest.java index e74c5823aef..ec3fe88ccfd 100644 --- a/src/test/java/org/tikv/common/RegionStoreClientTest.java +++ b/src/test/java/org/tikv/common/RegionStoreClientTest.java @@ -24,6 +24,7 @@ import org.tikv.common.region.RegionManager; import org.tikv.common.region.RegionStoreClient; import org.tikv.common.region.RegionStoreClient.RegionStoreClientBuilder; +import org.tikv.common.region.TiStore; import org.tikv.common.util.BackOffer; import org.tikv.common.util.ConcreteBackOffer; import org.tikv.kvproto.Kvrpcpb; @@ -40,13 +41,14 @@ private RegionStoreClient createClientV3() { } private RegionStoreClient createClient(String version) { - Metapb.Store store = + Metapb.Store meta = Metapb.Store.newBuilder() .setAddress(LOCAL_ADDR + ":" + port) .setId(1) .setState(Metapb.StoreState.Up) .setVersion(version) .build(); + TiStore store = new TiStore(meta); RegionStoreClientBuilder builder = new RegionStoreClientBuilder( From 5759e258ffe930729332c51f849952ef515f71fd Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Thu, 17 Jun 2021 22:05:26 +0800 Subject: [PATCH 05/10] fix null pointer bug Signed-off-by: Little-Wallace --- src/main/java/org/tikv/common/region/TiStore.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/tikv/common/region/TiStore.java b/src/main/java/org/tikv/common/region/TiStore.java index 0527c66bb47..2f799a08c07 100644 --- a/src/main/java/org/tikv/common/region/TiStore.java +++ b/src/main/java/org/tikv/common/region/TiStore.java @@ -9,7 +9,7 @@ public class TiStore { public TiStore(Metapb.Store store) { this.store = store; - this.unreachable.set(false); + this.unreachable = new AtomicBoolean(false); } public boolean invalid() { From 403c1edab37d7ec257a196712e44a0d65cc92e43 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Fri, 18 Jun 2021 10:40:08 +0800 Subject: [PATCH 06/10] fix some bug Signed-off-by: Little-Wallace --- src/main/java/org/tikv/common/ConfigUtils.java | 1 + .../java/org/tikv/common/TiConfiguration.java | 2 +- .../common/region/AbstractRegionStoreClient.java | 16 ++++++---------- .../java/org/tikv/common/region/TiStore.java | 2 +- 4 files changed, 9 insertions(+), 12 deletions(-) diff --git a/src/main/java/org/tikv/common/ConfigUtils.java b/src/main/java/org/tikv/common/ConfigUtils.java index cada32623af..1b0ae0dd9e1 100644 --- a/src/main/java/org/tikv/common/ConfigUtils.java +++ b/src/main/java/org/tikv/common/ConfigUtils.java @@ -77,6 +77,7 @@ public class ConfigUtils { public static final boolean DEF_METRICS_ENABLE = false; public static final int DEF_METRICS_PORT = 3140; public static final String DEF_TIKV_NETWORK_MAPPING_NAME = ""; + public static final boolean DEF_GRPC_FORWARD_ENABLE = true; public static final String NORMAL_COMMAND_PRIORITY = "NORMAL"; public static final String LOW_COMMAND_PRIORITY = "LOW"; diff --git a/src/main/java/org/tikv/common/TiConfiguration.java b/src/main/java/org/tikv/common/TiConfiguration.java index 21532071cfe..7f782cb8dc0 100644 --- a/src/main/java/org/tikv/common/TiConfiguration.java +++ b/src/main/java/org/tikv/common/TiConfiguration.java @@ -75,7 +75,7 @@ private static void loadFromDefaultProperties() { setIfMissing(TIKV_METRICS_ENABLE, DEF_METRICS_ENABLE); setIfMissing(TIKV_METRICS_PORT, DEF_METRICS_PORT); setIfMissing(TIKV_NETWORK_MAPPING_NAME, DEF_TIKV_NETWORK_MAPPING_NAME); - setIfMissing(TIKV_ENABLE_GRPC_FORWARD, false); + setIfMissing(TIKV_ENABLE_GRPC_FORWARD, DEF_GRPC_FORWARD_ENABLE); } public static void listAll() { diff --git a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java index 6093f6aa606..fc97152b6c0 100644 --- a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java @@ -110,21 +110,15 @@ public boolean onStoreUnreachable() { } if (region.getProxyStore() != null) { TiStore store = region.getProxyStore(); - if (checkHealth(store)) { - return true; - } else { - if (store.invalid()) { - this.regionManager.scheduleHealthCheckJob(store); - } - region = region.switchProxyStore(null); - regionManager.updateRegion(region); + if (!checkHealth(store) && store.markUnreachable()) { + this.regionManager.scheduleHealthCheckJob(store); } } else { if (!targetStore.isUnreachable()) { if (checkHealth(targetStore)) { return true; } else { - if (targetStore.invalid()) { + if (targetStore.markUnreachable()) { this.regionManager.scheduleHealthCheckJob(targetStore); } } @@ -172,7 +166,9 @@ private TiRegion switchProxyStore() { if (peer.getStoreId() != region.getLeader().getStoreId()) { if (region.getProxyStore() == null) { TiStore store = regionManager.getStoreById(peer.getStoreId()); - return region.switchProxyStore(store); + if (checkHealth(store)) { + return region.switchProxyStore(store); + } } else { TiStore proxyStore = region.getProxyStore(); if (peer.getStoreId() == proxyStore.getStore().getId()) { diff --git a/src/main/java/org/tikv/common/region/TiStore.java b/src/main/java/org/tikv/common/region/TiStore.java index 2f799a08c07..db1f2f30443 100644 --- a/src/main/java/org/tikv/common/region/TiStore.java +++ b/src/main/java/org/tikv/common/region/TiStore.java @@ -12,7 +12,7 @@ public TiStore(Metapb.Store store) { this.unreachable = new AtomicBoolean(false); } - public boolean invalid() { + public boolean markUnreachable() { return this.unreachable.compareAndSet(false, true); } From b7fd0dee9912c744518c62f59c40fc30b3a6320f Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Fri, 18 Jun 2021 15:30:15 +0800 Subject: [PATCH 07/10] fix some bug Signed-off-by: Little-Wallace --- src/main/java/org/tikv/common/ConfigUtils.java | 2 ++ src/main/java/org/tikv/common/TiConfiguration.java | 6 ++++++ src/main/java/org/tikv/common/TiSession.java | 2 +- .../tikv/common/region/AbstractRegionStoreClient.java | 9 +++++++-- .../java/org/tikv/common/region/RegionManager.java | 10 +++++----- .../java/org/tikv/common/region/RegionStoreClient.java | 8 +++++--- .../tikv/common/region/UnreachableStoreChecker.java | 6 ++---- 7 files changed, 28 insertions(+), 15 deletions(-) diff --git a/src/main/java/org/tikv/common/ConfigUtils.java b/src/main/java/org/tikv/common/ConfigUtils.java index 1b0ae0dd9e1..f4c06598ba2 100644 --- a/src/main/java/org/tikv/common/ConfigUtils.java +++ b/src/main/java/org/tikv/common/ConfigUtils.java @@ -49,10 +49,12 @@ public class ConfigUtils { public static final String TIKV_NETWORK_MAPPING_NAME = "tikv.network.mapping"; public static final String TIKV_ENABLE_GRPC_FORWARD = "tikv.enable_grpc_forward"; + public static final String TIKV_GRPC_HEALTH_CHECK_TIMEOUT = "tikv.grpc.health_check_timeout"; public static final String DEF_PD_ADDRESSES = "127.0.0.1:2379"; public static final String DEF_TIMEOUT = "600ms"; public static final String DEF_SCAN_TIMEOUT = "20s"; + public static final int DEF_CHECK_HEALTH_TIMEOUT = 40; public static final int DEF_SCAN_BATCH_SIZE = 10240; public static final int DEF_MAX_FRAME_SIZE = 268435456 * 2; // 256 * 2 MB public static final int DEF_INDEX_SCAN_BATCH_SIZE = 20000; diff --git a/src/main/java/org/tikv/common/TiConfiguration.java b/src/main/java/org/tikv/common/TiConfiguration.java index 7f782cb8dc0..c38fd44bbc7 100644 --- a/src/main/java/org/tikv/common/TiConfiguration.java +++ b/src/main/java/org/tikv/common/TiConfiguration.java @@ -76,6 +76,7 @@ private static void loadFromDefaultProperties() { setIfMissing(TIKV_METRICS_PORT, DEF_METRICS_PORT); setIfMissing(TIKV_NETWORK_MAPPING_NAME, DEF_TIKV_NETWORK_MAPPING_NAME); setIfMissing(TIKV_ENABLE_GRPC_FORWARD, DEF_GRPC_FORWARD_ENABLE); + setIfMissing(TIKV_GRPC_HEALTH_CHECK_TIMEOUT, DEF_CHECK_HEALTH_TIMEOUT); } public static void listAll() { @@ -258,6 +259,7 @@ private static ReplicaRead getReplicaRead(String key) { private boolean metricsEnable = getBoolean(TIKV_METRICS_ENABLE); private int metricsPort = getInt(TIKV_METRICS_PORT); + private int grpcHealthCheckTimeout = getInt(TIKV_GRPC_HEALTH_CHECK_TIMEOUT); private final String networkMappingName = get(TIKV_NETWORK_MAPPING_NAME); @@ -541,4 +543,8 @@ public String getNetworkMappingName() { public boolean getEnableGrpcForward() { return this.enableGrpcForward; } + + public long getGrpcHealthCheckTimeout() { + return this.grpcHealthCheckTimeout; + } } diff --git a/src/main/java/org/tikv/common/TiSession.java b/src/main/java/org/tikv/common/TiSession.java index 82f833b6fe9..749db0dc6c8 100644 --- a/src/main/java/org/tikv/common/TiSession.java +++ b/src/main/java/org/tikv/common/TiSession.java @@ -81,6 +81,7 @@ public TiSession(TiConfiguration conf) { this.conf = conf; this.channelFactory = new ChannelFactory(conf.getMaxFrameSize()); this.client = PDClient.createRaw(conf, channelFactory); + this.enableGrpcForward = conf.getEnableGrpcForward(); if (conf.isMetricsEnable()) { try { this.collectorRegistry = new CollectorRegistry(); @@ -91,7 +92,6 @@ public TiSession(TiConfiguration conf) { this.collectorRegistry.register(RetryPolicy.GRPC_SINGLE_REQUEST_LATENCY); this.collectorRegistry.register(RegionManager.GET_REGION_BY_KEY_REQUEST_LATENCY); this.collectorRegistry.register(PDClient.PD_GET_REGION_BY_KEY_REQUEST_LATENCY); - this.enableGrpcForward = conf.getEnableGrpcForward(); this.server = new HTTPServer( new InetSocketAddress(conf.getMetricsPort()), this.collectorRegistry, true); diff --git a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java index fc97152b6c0..a2d90ce47a6 100644 --- a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java @@ -128,8 +128,8 @@ public boolean onStoreUnreachable() { if (proxyRegion == null) { return false; } + regionManager.updateRegion(region, proxyRegion); region = proxyRegion; - regionManager.updateRegion(proxyRegion); String addressStr = region.getProxyStore().getStore().getAddress(); ManagedChannel channel = channelFactory.getChannel(addressStr, regionManager.getPDClient().getHostMapping()); @@ -141,10 +141,15 @@ public boolean onStoreUnreachable() { } private boolean checkHealth(TiStore store) { + if (store.getStore() == null) { + return false; + } String addressStr = store.getStore().getAddress(); ManagedChannel channel = channelFactory.getChannel(addressStr, regionManager.getPDClient().getHostMapping()); - HealthGrpc.HealthBlockingStub stub = HealthGrpc.newBlockingStub(channel); + HealthGrpc.HealthBlockingStub stub = + HealthGrpc.newBlockingStub(channel) + .withDeadlineAfter(conf.getGrpcHealthCheckTimeout(), TimeUnit.MILLISECONDS); HealthCheckRequest req = HealthCheckRequest.newBuilder().build(); try { HealthCheckResponse resp = stub.check(req); diff --git a/src/main/java/org/tikv/common/region/RegionManager.java b/src/main/java/org/tikv/common/region/RegionManager.java index 075ee04bf60..62008a9db49 100644 --- a/src/main/java/org/tikv/common/region/RegionManager.java +++ b/src/main/java/org/tikv/common/region/RegionManager.java @@ -85,7 +85,7 @@ public RegionManager( UnreachableStoreChecker storeChecker = new UnreachableStoreChecker(channelFactory, pdClient); this.storeChecker = storeChecker; this.executor = Executors.newScheduledThreadPool(1); - this.executor.schedule(storeChecker, 2, TimeUnit.SECONDS); + this.executor.scheduleAtFixedRate(storeChecker, 5, 5, TimeUnit.SECONDS); } else { this.storeChecker = null; this.executor = null; @@ -211,8 +211,8 @@ public synchronized TiRegion updateLeader(TiRegion region, long storeId) { return null; } - public boolean updateRegion(TiRegion region) { - return cache.updateRegion(region); + public boolean updateRegion(TiRegion oldRegion, TiRegion region) { + return cache.updateRegion(oldRegion, region); } /** @@ -335,13 +335,13 @@ public synchronized void invalidateRegion(TiRegion region) { } } - public synchronized boolean updateRegion(TiRegion region) { + public synchronized boolean updateRegion(TiRegion expected, TiRegion region) { try { if (logger.isDebugEnabled()) { logger.debug(String.format("invalidateRegion ID[%s]", region.getId())); } TiRegion oldRegion = regionCache.get(region.getId()); - if (oldRegion != null && !oldRegion.getRegionEpoch().equals(region.getRegionEpoch())) { + if (expected != oldRegion) { return false; } else { if (oldRegion != null) { diff --git a/src/main/java/org/tikv/common/region/RegionStoreClient.java b/src/main/java/org/tikv/common/region/RegionStoreClient.java index f9f31f25840..0be777ab681 100644 --- a/src/main/java/org/tikv/common/region/RegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/RegionStoreClient.java @@ -1272,9 +1272,11 @@ public RegionStoreClient build(TiRegion region, TiStore store, TiStoreType store } else { // If the store is reachable, which is update by check-health thread if (!store.isUnreachable()) { - TiRegion newRegion = region.switchProxyStore(null); - if (regionManager.updateRegion(newRegion)) { - region = newRegion; + if (region.getProxyStore() != null) { + TiRegion newRegion = region.switchProxyStore(null); + if (regionManager.updateRegion(region, newRegion)) { + region = newRegion; + } } } channel = channelFactory.getChannel(addressStr, pdClient.getHostMapping()); diff --git a/src/main/java/org/tikv/common/region/UnreachableStoreChecker.java b/src/main/java/org/tikv/common/region/UnreachableStoreChecker.java index 4cd809473e6..05273ecf4fc 100644 --- a/src/main/java/org/tikv/common/region/UnreachableStoreChecker.java +++ b/src/main/java/org/tikv/common/region/UnreachableStoreChecker.java @@ -6,12 +6,11 @@ import io.grpc.health.v1.HealthGrpc; import java.util.LinkedList; import java.util.List; -import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import org.tikv.common.ReadOnlyPDClient; import org.tikv.common.util.ChannelFactory; -public class UnreachableStoreChecker implements Callable { +public class UnreachableStoreChecker implements Runnable { private ConcurrentHashMap stores; private List taskQueue; private final ChannelFactory channelFactory; @@ -44,7 +43,7 @@ private List getUnhealthStore() { } @Override - public Object call() throws Exception { + public void run() { List unhealthStore = getUnhealthStore(); List restStore = new LinkedList<>(); for (TiStore store : unhealthStore) { @@ -72,6 +71,5 @@ public Object call() throws Exception { } this.taskQueue = restStore; } - return null; } } From 000e6fb4ae1ba2879c615de933b5d34cd3d16908 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Fri, 18 Jun 2021 16:24:47 +0800 Subject: [PATCH 08/10] address comment Signed-off-by: Little-Wallace --- .../org/tikv/common/region/RegionManager.java | 5 +- .../region/UnreachableStoreChecker.java | 47 ++++++++++--------- 2 files changed, 29 insertions(+), 23 deletions(-) diff --git a/src/main/java/org/tikv/common/region/RegionManager.java b/src/main/java/org/tikv/common/region/RegionManager.java index 62008a9db49..128509408cc 100644 --- a/src/main/java/org/tikv/common/region/RegionManager.java +++ b/src/main/java/org/tikv/common/region/RegionManager.java @@ -374,7 +374,10 @@ public synchronized void invalidateAllRegionForStore(long storeId) { } public synchronized void invalidateStore(long storeId) { - storeCache.remove(storeId); + TiStore store = storeCache.remove(storeId); + if (store != null) { + store.markReachable(); + } } public synchronized TiStore getStoreById(long id, BackOffer backOffer) { diff --git a/src/main/java/org/tikv/common/region/UnreachableStoreChecker.java b/src/main/java/org/tikv/common/region/UnreachableStoreChecker.java index 05273ecf4fc..43c6cd54a04 100644 --- a/src/main/java/org/tikv/common/region/UnreachableStoreChecker.java +++ b/src/main/java/org/tikv/common/region/UnreachableStoreChecker.java @@ -6,47 +6,58 @@ import io.grpc.health.v1.HealthGrpc; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; + import org.tikv.common.ReadOnlyPDClient; import org.tikv.common.util.ChannelFactory; public class UnreachableStoreChecker implements Runnable { private ConcurrentHashMap stores; - private List taskQueue; + private BlockingQueue taskQueue; private final ChannelFactory channelFactory; private final ReadOnlyPDClient pdClient; public UnreachableStoreChecker(ChannelFactory channelFactory, ReadOnlyPDClient pdClient) { this.stores = new ConcurrentHashMap(); - this.taskQueue = new LinkedList<>(); + this.taskQueue = new LinkedBlockingQueue<>(); this.channelFactory = channelFactory; this.pdClient = pdClient; } public void scheduleStoreHealthCheck(TiStore store) { TiStore oldStore = this.stores.get(Long.valueOf(store.getId())); - if (oldStore != null) { + if (oldStore == store) { return; } - synchronized (this.taskQueue) { - this.stores.put(Long.valueOf(store.getId()), store); - this.taskQueue.add(store); + this.stores.put(Long.valueOf(store.getId()), store); + if (!this.taskQueue.add(store)) { + // add queue false, mark it reachable so that it can be put again. + store.markReachable(); } } private List getUnhealthStore() { - synchronized (this.taskQueue) { - List unhealthStore = new LinkedList<>(); - unhealthStore.addAll(this.taskQueue); - return unhealthStore; + List unhealthStore = new LinkedList<>(); + while (!this.taskQueue.isEmpty()) { + try { + TiStore store = this.taskQueue.take(); + unhealthStore.add(store); + } catch (Exception e) { + return unhealthStore; + } } + return unhealthStore; } @Override public void run() { List unhealthStore = getUnhealthStore(); - List restStore = new LinkedList<>(); for (TiStore store : unhealthStore) { + if (!store.isUnreachable()) { + continue; + } String addressStr = store.getStore().getAddress(); ManagedChannel channel = channelFactory.getChannel(addressStr, pdClient.getHostMapping()); HealthGrpc.HealthBlockingStub stub = HealthGrpc.newBlockingStub(channel); @@ -58,18 +69,10 @@ public void run() { this.stores.remove(Long.valueOf(store.getId())); continue; } - } finally { - } - restStore.add(store); - } - synchronized (this.taskQueue) { - int idx = unhealthStore.size(); - if (idx < this.taskQueue.size()) { - for (int i = idx; i < this.taskQueue.size(); i++) { - restStore.add(this.taskQueue.get(i)); - } + this.taskQueue.add(store); + } catch (Exception e){ + this.taskQueue.add(store); } - this.taskQueue = restStore; } } } From 7953a2e4db7005909dfab94ee2df6acb704e0553 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Fri, 18 Jun 2021 16:39:41 +0800 Subject: [PATCH 09/10] refactor checker Signed-off-by: Little-Wallace --- .../java/org/tikv/common/region/UnreachableStoreChecker.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/tikv/common/region/UnreachableStoreChecker.java b/src/main/java/org/tikv/common/region/UnreachableStoreChecker.java index 43c6cd54a04..2c948b83714 100644 --- a/src/main/java/org/tikv/common/region/UnreachableStoreChecker.java +++ b/src/main/java/org/tikv/common/region/UnreachableStoreChecker.java @@ -9,7 +9,6 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; - import org.tikv.common.ReadOnlyPDClient; import org.tikv.common.util.ChannelFactory; @@ -56,7 +55,7 @@ public void run() { List unhealthStore = getUnhealthStore(); for (TiStore store : unhealthStore) { if (!store.isUnreachable()) { - continue; + continue; } String addressStr = store.getStore().getAddress(); ManagedChannel channel = channelFactory.getChannel(addressStr, pdClient.getHostMapping()); @@ -70,7 +69,7 @@ public void run() { continue; } this.taskQueue.add(store); - } catch (Exception e){ + } catch (Exception e) { this.taskQueue.add(store); } } From e70177f688c97790f5dde1f300dfcd8df6776049 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Fri, 18 Jun 2021 17:41:26 +0800 Subject: [PATCH 10/10] try storeunreachable first Signed-off-by: Little-Wallace --- .../org/tikv/common/operation/RegionErrorHandler.java | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java index ccfe46a558f..c783c4f3b30 100644 --- a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java +++ b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java @@ -166,13 +166,8 @@ public boolean handleRegionError(BackOffer backOffer, Errorpb.Error error) { @Override public boolean handleRequestError(BackOffer backOffer, Exception e) { - Status status = Status.fromThrowable(e); - if (status == Status.UNAVAILABLE || status == Status.DEADLINE_EXCEEDED) { - if (recv.onStoreUnreachable()) { - return true; - } else { - regionManager.onRequestFail(recv.getRegion()); - } + if (recv.onStoreUnreachable()) { + return true; } else { regionManager.onRequestFail(recv.getRegion()); }