From cd823d4450ba8624697bc1039a42d1cd3256189f Mon Sep 17 00:00:00 2001 From: birdstorm Date: Sat, 13 Nov 2021 22:03:59 +0800 Subject: [PATCH 01/21] optimize grpc forward by get leader with async Signed-off-by: birdstorm --- .../java/org/tikv/common/TiConfiguration.java | 4 +- .../tikv/common/operation/KVErrorHandler.java | 2 +- .../common/operation/RegionErrorHandler.java | 6 +- .../region/AbstractRegionStoreClient.java | 250 ++++++------------ .../common/region/RegionErrorReceiver.java | 6 +- .../tikv/common/region/RegionStoreClient.java | 1 + .../common/region/StoreHealthyChecker.java | 8 +- .../java/org/tikv/common/region/TiStore.java | 23 +- 8 files changed, 95 insertions(+), 205 deletions(-) diff --git a/src/main/java/org/tikv/common/TiConfiguration.java b/src/main/java/org/tikv/common/TiConfiguration.java index 0d2a65580d0..614cfbdeaa3 100644 --- a/src/main/java/org/tikv/common/TiConfiguration.java +++ b/src/main/java/org/tikv/common/TiConfiguration.java @@ -35,9 +35,9 @@ public class TiConfiguration implements Serializable { private static final Logger logger = LoggerFactory.getLogger(TiConfiguration.class); private static final ConcurrentHashMap settings = new ConcurrentHashMap<>(); - public static final Metadata.Key FORWARD_META_DATA_KEY = + public static final Metadata.Key FORWARD_META_DATA_KEY = Metadata.Key.of("tikv-forwarded-host", Metadata.ASCII_STRING_MARSHALLER); - public static final Metadata.Key PD_FORWARD_META_DATA_KEY = + public static final Metadata.Key PD_FORWARD_META_DATA_KEY = Metadata.Key.of("pd-forwarded-host", Metadata.ASCII_STRING_MARSHALLER); static { diff --git a/src/main/java/org/tikv/common/operation/KVErrorHandler.java b/src/main/java/org/tikv/common/operation/KVErrorHandler.java index dfbe24c5a65..37b95a4e7aa 100644 --- a/src/main/java/org/tikv/common/operation/KVErrorHandler.java +++ b/src/main/java/org/tikv/common/operation/KVErrorHandler.java @@ -95,7 +95,7 @@ public boolean handleResponseError(BackOffer backOffer, RespT resp) { if (error != null) { return regionHandler.handleRegionError(backOffer, error); } else { - regionHandler.tryUpdateRegionStore(); + regionHandler.switchLeaderOrForwardRequest(); } // Key error handling logic diff --git a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java index cec220440c3..d36753fbbfc 100644 --- a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java +++ b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java @@ -47,13 +47,13 @@ public boolean handleResponseError(BackOffer backOffer, RespT resp) { if (error != null) { return handleRegionError(backOffer, error); } else { - tryUpdateRegionStore(); + switchLeaderOrForwardRequest(); } return false; } - public void tryUpdateRegionStore() { - recv.tryUpdateRegionStore(); + public void switchLeaderOrForwardRequest() { + recv.switchLeaderOrForwardRequest(); } public boolean handleRegionError(BackOffer backOffer, Errorpb.Error error) { diff --git a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java index 57551f1e2c8..fb4af2d136e 100644 --- a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java @@ -20,21 +20,32 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; +import com.google.protobuf.ByteString; import io.grpc.ManagedChannel; import io.grpc.Metadata; +import io.grpc.stub.ClientCalls; import io.grpc.stub.MetadataUtils; + +import java.util.ArrayList; import java.util.List; import java.util.Set; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; +import java.util.function.Supplier; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.tikv.common.AbstractGRPCClient; import org.tikv.common.TiConfiguration; +import org.tikv.common.TiSession; import org.tikv.common.exception.GrpcException; +import org.tikv.common.operation.RegionErrorHandler; import org.tikv.common.util.ChannelFactory; +import org.tikv.common.util.Pair; +import org.tikv.kvproto.Errorpb; import org.tikv.kvproto.Kvrpcpb; import org.tikv.kvproto.Metapb; import org.tikv.kvproto.TikvGrpc; +import org.tikv.raw.RawKVClient; public abstract class AbstractRegionStoreClient extends AbstractGRPCClient @@ -43,11 +54,7 @@ public abstract class AbstractRegionStoreClient protected final RegionManager regionManager; protected TiRegion region; - protected TiStore targetStore; - protected TiStore originStore; - private long retryForwardTimes; - private long retryLeaderTimes; - private Metapb.Peer candidateLeader; + protected TiStore store; protected AbstractRegionStoreClient( TiConfiguration conf, @@ -63,14 +70,10 @@ protected AbstractRegionStoreClient( checkArgument(region.getLeader() != null, "Leader Peer is null"); this.region = region; this.regionManager = regionManager; - this.targetStore = store; - this.originStore = null; - this.candidateLeader = null; - this.retryForwardTimes = 0; - this.retryLeaderTimes = 0; - if (this.targetStore.getProxyStore() != null) { + this.store = store; + if (this.store.getProxyStore() != null) { this.timeout = conf.getForwardTimeout(); - } else if (!this.targetStore.isReachable() && !this.targetStore.canForwardFirst()) { + } else if (!this.store.isReachable() && !this.store.canForward()) { onStoreUnreachable(); } } @@ -110,215 +113,112 @@ public boolean onNotLeader(TiRegion newRegion) { return false; } - // If we try one peer but find the leader has not changed, we do not need try other peers. - if (candidateLeader != null - && region.getLeader().getStoreId() == newRegion.getLeader().getStoreId()) { - retryLeaderTimes = newRegion.getFollowerList().size(); - originStore = null; + // If we try one peer but find the leader has not changed, we do not need to try other peers. + if (region.getLeader().getStoreId() == newRegion.getLeader().getStoreId()) { + store = null; } - candidateLeader = null; region = newRegion; - targetStore = regionManager.getStoreById(region.getLeader().getStoreId()); + store = regionManager.getStoreById(region.getLeader().getStoreId()); updateClientStub(); return true; } @Override - public boolean onStoreUnreachable() { - if (!targetStore.isValid()) { - logger.warn(String.format("store [%d] has been invalid", targetStore.getId())); - targetStore = regionManager.getStoreById(targetStore.getId()); + public boolean onStoreUnreachable(TiSession session) { + if (!store.isValid()) { + logger.warn(String.format("store [%d] has been invalid", store.getId())); + store = regionManager.getStoreById(store.getId()); updateClientStub(); return true; } - if (targetStore.getProxyStore() == null) { - if (targetStore.isReachable()) { + if (store.getProxyStore() == null) { + if (store.isReachable()) { return true; } } - // If this store has failed to forward request too many times, we shall try other peer at first - // so that we can - // reduce the latency cost by fail requests. - if (targetStore.canForwardFirst()) { - if (retryOtherStoreByProxyForward()) { - return true; - } - if (retryOtherStoreLeader()) { - return true; - } - } else { - if (retryOtherStoreLeader()) { - return true; - } - if (retryOtherStoreByProxyForward()) { - return true; - } + CompletableFuture>[] futureArray = new CompletableFuture[region.getFollowerList().size() * 2]; + int num = 0; + for (Metapb.Peer peer: region.getFollowerList()) { + futureArray[num++] = CompletableFuture.supplyAsync(() -> { + ByteString key = region.getStartKey(); + RegionStoreClient client = session.getRegionStoreClientBuilder().build(key); + TikvGrpc.TikvBlockingStub stub = getBlockingStub(); + Supplier factory = + () -> Kvrpcpb.RawGetRequest.newBuilder().setContext(makeContext(TiStoreType.TiKV)).setKey(key).build(); + Callable callable = () -> ClientCalls.blockingUnaryCall(client.getBlockingStub().getChannel(), TikvGrpc.getRawGetMethod(), stub.getCallOptions(), factory.get()); + + try { + Kvrpcpb.RawGetResponse resp = callable.call(); + if (resp.hasRegionError()) { + Errorpb.Error error = resp.getRegionError(); + if (error.hasNotLeader()) { + return Pair.create(peer, error.getNotLeader().getLeader()); + } + } + } catch (Exception e) { + // ignore exception + try { + Thread.sleep(50); + } catch (InterruptedException ex) { + ex.printStackTrace(); + } + } + return null; + }); } + CompletableFuture.anyOf(futureArray).thenAccept((Metapb.Peer answered, Metapb.Peer leader) -> { + switchProxyStore(answered, leader); + }); + + // If this store has failed to forward request too many times, we shall try other peer at first + // so that we can reduce the latency cost by fail requests. logger.warn( String.format( "retry time exceed for region[%d], invalid store[%d]", - region.getId(), targetStore.getId())); + region.getId(), store.getId())); regionManager.onRequestFail(region); return false; } protected Kvrpcpb.Context makeContext(TiStoreType storeType) { - if (candidateLeader != null && storeType == TiStoreType.TiKV) { - return region.getReplicaContext(candidateLeader, java.util.Collections.emptySet()); - } else { - return region.getReplicaContext(java.util.Collections.emptySet(), storeType); - } + return region.getReplicaContext(java.util.Collections.emptySet(), storeType); } protected Kvrpcpb.Context makeContext(Set resolvedLocks, TiStoreType storeType) { - if (candidateLeader != null && storeType == TiStoreType.TiKV) { - return region.getReplicaContext(candidateLeader, resolvedLocks); - } else { - return region.getReplicaContext(resolvedLocks, storeType); - } + return region.getReplicaContext(resolvedLocks, storeType); } @Override - public void tryUpdateRegionStore() { - if (originStore != null) { - if (originStore.getId() == targetStore.getId()) { - logger.warn( - String.format( - "update store [%s] by proxy-store [%s]", - targetStore.getStore().getAddress(), targetStore.getProxyStore().getAddress())); - // We do not need to mark the store can-forward, because if one store has grpc forward - // successfully, it will - // create a new store object, which is can-forward. - regionManager.updateStore(originStore, targetStore); - } else { - originStore.forwardFail(); - } - } - if (candidateLeader != null) { + public void switchLeaderOrForwardRequest() { + if (leader != null) { logger.warn( String.format( "update leader to store [%d] for region[%d]", - candidateLeader.getStoreId(), region.getId())); - this.regionManager.updateLeader(region, candidateLeader.getStoreId()); + region.getLeader().getStoreId(), region.getId())); + this.regionManager.updateLeader(region, leader.getStoreId()); } } - private boolean retryOtherStoreLeader() { - List peers = region.getFollowerList(); - if (retryLeaderTimes >= peers.size()) { - return false; - } - retryLeaderTimes += 1; - boolean hasVisitedStore = false; - for (Metapb.Peer cur : peers) { - if (candidateLeader == null || hasVisitedStore) { - TiStore store = regionManager.getStoreById(cur.getStoreId()); - if (store != null && store.isReachable()) { - targetStore = store; - candidateLeader = cur; - logger.warn( - String.format( - "try store [%d],peer[%d] for region[%d], which may be new leader", - targetStore.getId(), candidateLeader.getId(), region.getId())); - updateClientStub(); - return true; - } else { - continue; - } - } else if (candidateLeader.getId() == cur.getId()) { - hasVisitedStore = true; - } - } - candidateLeader = null; - retryLeaderTimes = peers.size(); - return false; - } - private void updateClientStub() { - String addressStr = targetStore.getStore().getAddress(); - if (targetStore.getProxyStore() != null) { - addressStr = targetStore.getProxyStore().getAddress(); + String addressStr = store.getStore().getAddress(); + if (store.getProxyStore() != null) { + addressStr = store.getProxyStore().getAddress(); } ManagedChannel channel = channelFactory.getChannel(addressStr, regionManager.getPDClient().getHostMapping()); blockingStub = TikvGrpc.newBlockingStub(channel); asyncStub = TikvGrpc.newStub(channel); - if (targetStore.getProxyStore() != null) { + if (store.getProxyStore() != null) { Metadata header = new Metadata(); - header.put(TiConfiguration.FORWARD_META_DATA_KEY, targetStore.getStore().getAddress()); + header.put(TiConfiguration.FORWARD_META_DATA_KEY, store.getStore().getAddress()); blockingStub = MetadataUtils.attachHeaders(blockingStub, header); asyncStub = MetadataUtils.attachHeaders(asyncStub, header); } } - private boolean retryOtherStoreByProxyForward() { - if (!conf.getEnableGrpcForward()) { - return false; - } - if (retryForwardTimes >= region.getFollowerList().size()) { - // If we try to forward request to leader by follower failed, it means that the store of old - // leader may be - // unavailable but the new leader has not been report to PD. So we can ban this store for a - // short time to - // avoid too many request try forward rather than try other peer. - if (originStore != null) { - originStore.forwardFail(); - } - return false; - } - TiStore proxyStore = switchProxyStore(); - if (proxyStore == null) { - logger.warn( - String.format( - "no forward store can be selected for store [%s] and region[%d]", - targetStore.getStore().getAddress(), region.getId())); - if (originStore != null) { - originStore.forwardFail(); - } else { - targetStore.forwardFail(); - } - return false; - } - if (originStore == null) { - originStore = targetStore; - if (this.targetStore.getProxyStore() != null) { - this.timeout = conf.getForwardTimeout(); - } - } - targetStore = proxyStore; - retryForwardTimes += 1; - updateClientStub(); - logger.warn( - String.format( - "forward request to store [%s] by store [%s] for region[%d]", - targetStore.getStore().getAddress(), - targetStore.getProxyStore().getAddress(), - region.getId())); - return true; - } - - private TiStore switchProxyStore() { - boolean hasVisitedStore = false; - List peers = region.getFollowerList(); - if (peers.isEmpty()) { - return null; - } - Metapb.Store proxyStore = targetStore.getProxyStore(); - if (proxyStore == null || peers.get(peers.size() - 1).getStoreId() == proxyStore.getId()) { - hasVisitedStore = true; - } - for (Metapb.Peer peer : peers) { - if (hasVisitedStore) { - TiStore store = regionManager.getStoreById(peer.getStoreId()); - if (store.isReachable()) { - return targetStore.withProxy(store.getStore()); - } - } else if (peer.getStoreId() == proxyStore.getId()) { - hasVisitedStore = true; - } - } - return null; + private TiStore switchProxyStore(Metapb.Peer answered, Metapb.Peer leader) { + // } } diff --git a/src/main/java/org/tikv/common/region/RegionErrorReceiver.java b/src/main/java/org/tikv/common/region/RegionErrorReceiver.java index 0a5bcabf037..91bb4297c03 100644 --- a/src/main/java/org/tikv/common/region/RegionErrorReceiver.java +++ b/src/main/java/org/tikv/common/region/RegionErrorReceiver.java @@ -17,13 +17,15 @@ package org.tikv.common.region; +import org.tikv.common.TiSession; + public interface RegionErrorReceiver { boolean onNotLeader(TiRegion region); /// return whether we need to retry this request. - boolean onStoreUnreachable(); + boolean onStoreUnreachable(TiSession session); - void tryUpdateRegionStore(); + void switchLeaderOrForwardRequest(); TiRegion getRegion(); } diff --git a/src/main/java/org/tikv/common/region/RegionStoreClient.java b/src/main/java/org/tikv/common/region/RegionStoreClient.java index c747d9216c4..eefe978847c 100644 --- a/src/main/java/org/tikv/common/region/RegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/RegionStoreClient.java @@ -30,6 +30,7 @@ import io.grpc.stub.MetadataUtils; import io.prometheus.client.Histogram; import java.util.*; +import java.util.concurrent.Future; import java.util.function.Supplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/src/main/java/org/tikv/common/region/StoreHealthyChecker.java b/src/main/java/org/tikv/common/region/StoreHealthyChecker.java index fbc75cb534f..b2bcdbe1fd4 100644 --- a/src/main/java/org/tikv/common/region/StoreHealthyChecker.java +++ b/src/main/java/org/tikv/common/region/StoreHealthyChecker.java @@ -19,12 +19,12 @@ public class StoreHealthyChecker implements Runnable { private static final Logger logger = LoggerFactory.getLogger(StoreHealthyChecker.class); private static final long MAX_CHECK_STORE_TOMBSTONE_TICK = 60; - private BlockingQueue taskQueue; + private final BlockingQueue taskQueue; private final ChannelFactory channelFactory; private final ReadOnlyPDClient pdClient; private final RegionCache cache; private long checkTombstoneTick; - private long timeout; + private final long timeout; public StoreHealthyChecker( ChannelFactory channelFactory, ReadOnlyPDClient pdClient, RegionCache cache, long timeout) { @@ -122,8 +122,8 @@ public void run() { "store [%s] recovers to be reachable and canforward", store.getAddress())); store.markReachable(); } - if (!store.canForwardFirst()) { - store.makrCanForward(); + if (!store.canForward()) { + store.markCanForward(); } } } else if (store.isReachable()) { diff --git a/src/main/java/org/tikv/common/region/TiStore.java b/src/main/java/org/tikv/common/region/TiStore.java index a6346f44c3d..007856f7f44 100644 --- a/src/main/java/org/tikv/common/region/TiStore.java +++ b/src/main/java/org/tikv/common/region/TiStore.java @@ -5,13 +5,11 @@ import org.tikv.kvproto.Metapb; public class TiStore { - private static long MAX_FAIL_FORWARD_TIMES = 4; private final Metapb.Store store; private final Metapb.Store proxyStore; - private AtomicBoolean reachable; - private AtomicBoolean valid; - private AtomicLong failForwardCount; - private AtomicBoolean canForward; + private final AtomicBoolean reachable; + private final AtomicBoolean valid; + private final AtomicBoolean canForward; public TiStore(Metapb.Store store) { this.store = store; @@ -19,7 +17,6 @@ public TiStore(Metapb.Store store) { this.valid = new AtomicBoolean(true); this.canForward = new AtomicBoolean(true); this.proxyStore = null; - this.failForwardCount = new AtomicLong(0); } private TiStore(Metapb.Store store, Metapb.Store proxyStore) { @@ -32,7 +29,6 @@ private TiStore(Metapb.Store store, Metapb.Store proxyStore) { this.valid = new AtomicBoolean(true); this.canForward = new AtomicBoolean(true); this.proxyStore = proxyStore; - this.failForwardCount = new AtomicLong(0); } @java.lang.Override @@ -81,20 +77,11 @@ public void markInvalid() { this.valid.set(false); } - public void forwardFail() { - if (this.canForward.get()) { - if (this.failForwardCount.addAndGet(1) >= MAX_FAIL_FORWARD_TIMES) { - this.canForward.set(false); - } - } - } - - public void makrCanForward() { - this.failForwardCount.set(0); + public void markCanForward() { this.canForward.set(true); } - public boolean canForwardFirst() { + public boolean canForward() { return this.canForward.get(); } From 06ce2a02c0ea8aca064f727516b138a17725039c Mon Sep 17 00:00:00 2001 From: birdstorm Date: Sun, 14 Nov 2021 14:01:28 +0800 Subject: [PATCH 02/21] add switch proxy store Signed-off-by: birdstorm --- .../region/AbstractRegionStoreClient.java | 34 ++++++++++++++----- .../common/region/RegionErrorReceiver.java | 2 +- 2 files changed, 27 insertions(+), 9 deletions(-) diff --git a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java index fb4af2d136e..28a791f9454 100644 --- a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java @@ -20,13 +20,18 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; +import com.google.common.util.concurrent.ListenableFuture; import com.google.protobuf.ByteString; import io.grpc.ManagedChannel; import io.grpc.Metadata; +import io.grpc.health.v1.HealthCheckRequest; +import io.grpc.health.v1.HealthCheckResponse; +import io.grpc.health.v1.HealthGrpc; import io.grpc.stub.ClientCalls; import io.grpc.stub.MetadataUtils; import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; import java.util.Set; import java.util.concurrent.*; @@ -124,7 +129,7 @@ public boolean onNotLeader(TiRegion newRegion) { } @Override - public boolean onStoreUnreachable(TiSession session) { + public boolean onStoreUnreachable() { if (!store.isValid()) { logger.warn(String.format("store [%d] has been invalid", store.getId())); store = regionManager.getStoreById(store.getId()); @@ -138,16 +143,17 @@ public boolean onStoreUnreachable(TiSession session) { } } - CompletableFuture>[] futureArray = new CompletableFuture[region.getFollowerList().size() * 2]; + CompletableFuture>[] futureArray = new CompletableFuture[region.getFollowerList().size()]; int num = 0; for (Metapb.Peer peer: region.getFollowerList()) { futureArray[num++] = CompletableFuture.supplyAsync(() -> { ByteString key = region.getStartKey(); - RegionStoreClient client = session.getRegionStoreClientBuilder().build(key); + TiStore store = regionManager.getStoreById(peer.getStoreId()); + ManagedChannel channel = channelFactory.getChannel(store.getAddress(), regionManager.getPDClient().getHostMapping()); TikvGrpc.TikvBlockingStub stub = getBlockingStub(); Supplier factory = () -> Kvrpcpb.RawGetRequest.newBuilder().setContext(makeContext(TiStoreType.TiKV)).setKey(key).build(); - Callable callable = () -> ClientCalls.blockingUnaryCall(client.getBlockingStub().getChannel(), TikvGrpc.getRawGetMethod(), stub.getCallOptions(), factory.get()); + Callable callable = () -> ClientCalls.blockingUnaryCall(channel, TikvGrpc.getRawGetMethod(), stub.getCallOptions(), factory.get()); try { Kvrpcpb.RawGetResponse resp = callable.call(); @@ -166,10 +172,10 @@ public boolean onStoreUnreachable(TiSession session) { } } return null; - }); + }, Executors.newFixedThreadPool(region.getFollowerList().size())); } - CompletableFuture.anyOf(futureArray).thenAccept((Metapb.Peer answered, Metapb.Peer leader) -> { - switchProxyStore(answered, leader); + CompletableFuture.anyOf(futureArray).thenAccept((pair) ->{ + switchProxyStore(pair.first, pair.second); }); // If this store has failed to forward request too many times, we shall try other peer at first @@ -219,6 +225,18 @@ private void updateClientStub() { } private TiStore switchProxyStore(Metapb.Peer answered, Metapb.Peer leader) { - // + List peers = region.getFollowerList(); + if (peers.isEmpty()) { + return null; + } + for(Metapb.Peer peer: peers) { + TiStore store = regionManager.getStoreById(peer.getStoreId()); + ManagedChannel channel = channelFactory.getChannel(store.getAddress(), regionManager.getPDClient().getHostMapping()); + HealthGrpc.HealthFutureStub stub = HealthGrpc.newFutureStub(channel).withDeadlineAfter(timeout, TimeUnit.MILLISECONDS); + Metadata header = new Metadata(); + header.put(TiConfiguration.FORWARD_META_DATA_KEY, store.getStore().getAddress()); + HealthCheckRequest req = HealthCheckRequest.newBuilder().build(); + ListenableFuture task = stub.check(req); + } } } diff --git a/src/main/java/org/tikv/common/region/RegionErrorReceiver.java b/src/main/java/org/tikv/common/region/RegionErrorReceiver.java index 91bb4297c03..7cc90d3711f 100644 --- a/src/main/java/org/tikv/common/region/RegionErrorReceiver.java +++ b/src/main/java/org/tikv/common/region/RegionErrorReceiver.java @@ -23,7 +23,7 @@ public interface RegionErrorReceiver { boolean onNotLeader(TiRegion region); /// return whether we need to retry this request. - boolean onStoreUnreachable(TiSession session); + boolean onStoreUnreachable(); void switchLeaderOrForwardRequest(); From eb0482a2f12a05319656449f31e7266c893db22d Mon Sep 17 00:00:00 2001 From: birdstorm Date: Sun, 14 Nov 2021 17:15:56 +0800 Subject: [PATCH 03/21] refine logic Signed-off-by: birdstorm --- .../org/tikv/common/AbstractGRPCClient.java | 14 +- src/main/java/org/tikv/common/PDClient.java | 14 +- .../tikv/common/operation/KVErrorHandler.java | 2 - .../common/operation/RegionErrorHandler.java | 6 - .../region/AbstractRegionStoreClient.java | 221 ++++++++++++------ .../common/region/RegionErrorReceiver.java | 4 - .../tikv/common/region/RegionStoreClient.java | 13 +- .../java/org/tikv/common/region/TiStore.java | 1 - .../tikv/txn/AbstractLockResolverClient.java | 2 +- .../org/tikv/txn/LockResolverClientV2.java | 4 +- .../org/tikv/txn/LockResolverClientV3.java | 4 +- .../org/tikv/txn/LockResolverClientV4.java | 4 +- 12 files changed, 183 insertions(+), 106 deletions(-) diff --git a/src/main/java/org/tikv/common/AbstractGRPCClient.java b/src/main/java/org/tikv/common/AbstractGRPCClient.java index 813b98629aa..dd886eb0700 100644 --- a/src/main/java/org/tikv/common/AbstractGRPCClient.java +++ b/src/main/java/org/tikv/common/AbstractGRPCClient.java @@ -23,6 +23,7 @@ import io.grpc.health.v1.HealthCheckRequest; import io.grpc.health.v1.HealthCheckResponse; import io.grpc.health.v1.HealthGrpc; +import io.grpc.stub.AbstractFutureStub; import io.grpc.stub.AbstractStub; import io.grpc.stub.ClientCalls; import io.grpc.stub.StreamObserver; @@ -38,14 +39,15 @@ import org.tikv.common.util.ChannelFactory; public abstract class AbstractGRPCClient< - BlockingStubT extends AbstractStub, StubT extends AbstractStub> + BlockingStubT extends AbstractStub, + FutureStubT extends AbstractFutureStub> implements AutoCloseable { protected final Logger logger = LoggerFactory.getLogger(this.getClass()); protected final ChannelFactory channelFactory; protected TiConfiguration conf; protected long timeout; protected BlockingStubT blockingStub; - protected StubT asyncStub; + protected FutureStubT asyncStub; protected AbstractGRPCClient(TiConfiguration conf, ChannelFactory channelFactory) { this.conf = conf; @@ -57,7 +59,7 @@ protected AbstractGRPCClient( TiConfiguration conf, ChannelFactory channelFactory, BlockingStubT blockingStub, - StubT asyncStub) { + FutureStubT asyncStub) { this.conf = conf; this.timeout = conf.getTimeout(); this.channelFactory = channelFactory; @@ -109,7 +111,7 @@ protected void callAsyncWithRetry( .create(handler) .callWithRetry( () -> { - StubT stub = getAsyncStub(); + FutureStubT stub = getAsyncStub(); ClientCalls.asyncUnaryCall( stub.getChannel().newCall(method, stub.getCallOptions()), requestFactory.get(), @@ -133,7 +135,7 @@ StreamObserver callBidiStreamingWithRetry( .create(handler) .callWithRetry( () -> { - StubT stub = getAsyncStub(); + FutureStubT stub = getAsyncStub(); return asyncBidiStreamingCall( stub.getChannel().newCall(method, stub.getCallOptions()), responseObserver); }, @@ -175,7 +177,7 @@ public long getTimeout() { protected abstract BlockingStubT getBlockingStub(); - protected abstract StubT getAsyncStub(); + protected abstract FutureStubT getAsyncStub(); protected boolean checkHealth(String addressStr, HostMapping hostMapping) { ManagedChannel channel = channelFactory.getChannel(addressStr, hostMapping); diff --git a/src/main/java/org/tikv/common/PDClient.java b/src/main/java/org/tikv/common/PDClient.java index b639a4b9519..f6dab2c4bc0 100644 --- a/src/main/java/org/tikv/common/PDClient.java +++ b/src/main/java/org/tikv/common/PDClient.java @@ -69,7 +69,7 @@ import org.tikv.kvproto.Metapb.Store; import org.tikv.kvproto.PDGrpc; import org.tikv.kvproto.PDGrpc.PDBlockingStub; -import org.tikv.kvproto.PDGrpc.PDStub; +import org.tikv.kvproto.PDGrpc.PDFutureStub; import org.tikv.kvproto.Pdpb; import org.tikv.kvproto.Pdpb.Error; import org.tikv.kvproto.Pdpb.ErrorType; @@ -92,7 +92,7 @@ import org.tikv.kvproto.Pdpb.TsoRequest; import org.tikv.kvproto.Pdpb.TsoResponse; -public class PDClient extends AbstractGRPCClient +public class PDClient extends AbstractGRPCClient implements ReadOnlyPDClient { private static final String TIFLASH_TABLE_SYNC_PROGRESS_PATH = "/tiflash/table/sync"; private static final long MIN_TRY_UPDATE_DURATION = 50; @@ -552,7 +552,7 @@ protected PDBlockingStub getBlockingStub() { } @Override - protected PDStub getAsyncStub() { + protected PDFutureStub getAsyncStub() { if (pdClientWrapper == null) { throw new GrpcException("PDClient may not be initialized"); } @@ -635,7 +635,7 @@ private void initCluster() { static class PDClientWrapper { private final String leaderInfo; private final PDBlockingStub blockingStub; - private final PDStub asyncStub; + private final PDFutureStub asyncStub; private final long createTime; private final String storeAddress; @@ -646,10 +646,10 @@ static class PDClientWrapper { header.put(TiConfiguration.PD_FORWARD_META_DATA_KEY, addrToUri(leaderInfo).toString()); this.blockingStub = MetadataUtils.attachHeaders(PDGrpc.newBlockingStub(clientChannel), header); - this.asyncStub = MetadataUtils.attachHeaders(PDGrpc.newStub(clientChannel), header); + this.asyncStub = MetadataUtils.attachHeaders(PDGrpc.newFutureStub(clientChannel), header); } else { this.blockingStub = PDGrpc.newBlockingStub(clientChannel); - this.asyncStub = PDGrpc.newStub(clientChannel); + this.asyncStub = PDGrpc.newFutureStub(clientChannel); } this.leaderInfo = leaderInfo; this.storeAddress = storeAddress; @@ -668,7 +668,7 @@ PDBlockingStub getBlockingStub() { return blockingStub; } - PDStub getAsyncStub() { + PDFutureStub getAsyncStub() { return asyncStub; } diff --git a/src/main/java/org/tikv/common/operation/KVErrorHandler.java b/src/main/java/org/tikv/common/operation/KVErrorHandler.java index 37b95a4e7aa..3321b560740 100644 --- a/src/main/java/org/tikv/common/operation/KVErrorHandler.java +++ b/src/main/java/org/tikv/common/operation/KVErrorHandler.java @@ -94,8 +94,6 @@ public boolean handleResponseError(BackOffer backOffer, RespT resp) { Errorpb.Error error = regionHandler.getRegionError(resp); if (error != null) { return regionHandler.handleRegionError(backOffer, error); - } else { - regionHandler.switchLeaderOrForwardRequest(); } // Key error handling logic diff --git a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java index d36753fbbfc..230ceedaa34 100644 --- a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java +++ b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java @@ -46,16 +46,10 @@ public boolean handleResponseError(BackOffer backOffer, RespT resp) { Errorpb.Error error = getRegionError(resp); if (error != null) { return handleRegionError(backOffer, error); - } else { - switchLeaderOrForwardRequest(); } return false; } - public void switchLeaderOrForwardRequest() { - recv.switchLeaderOrForwardRequest(); - } - public boolean handleRegionError(BackOffer backOffer, Errorpb.Error error) { if (error.hasNotLeader()) { // this error is reported from raftstore: diff --git a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java index 28a791f9454..fcf6bdd1a93 100644 --- a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java @@ -29,31 +29,23 @@ import io.grpc.health.v1.HealthGrpc; import io.grpc.stub.ClientCalls; import io.grpc.stub.MetadataUtils; - -import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.Set; import java.util.concurrent.*; -import java.util.function.Supplier; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.tikv.common.AbstractGRPCClient; import org.tikv.common.TiConfiguration; -import org.tikv.common.TiSession; import org.tikv.common.exception.GrpcException; -import org.tikv.common.operation.RegionErrorHandler; import org.tikv.common.util.ChannelFactory; -import org.tikv.common.util.Pair; import org.tikv.kvproto.Errorpb; import org.tikv.kvproto.Kvrpcpb; import org.tikv.kvproto.Metapb; import org.tikv.kvproto.TikvGrpc; -import org.tikv.raw.RawKVClient; public abstract class AbstractRegionStoreClient - extends AbstractGRPCClient + extends AbstractGRPCClient implements RegionErrorReceiver { private static final Logger logger = LoggerFactory.getLogger(AbstractRegionStoreClient.class); @@ -67,7 +59,7 @@ protected AbstractRegionStoreClient( TiStore store, ChannelFactory channelFactory, TikvGrpc.TikvBlockingStub blockingStub, - TikvGrpc.TikvStub asyncStub, + TikvGrpc.TikvFutureStub asyncStub, RegionManager regionManager) { super(conf, channelFactory, blockingStub, asyncStub); checkNotNull(region, "Region is empty"); @@ -94,7 +86,7 @@ protected TikvGrpc.TikvBlockingStub getBlockingStub() { } @Override - protected TikvGrpc.TikvStub getAsyncStub() { + protected TikvGrpc.TikvFutureStub getAsyncStub() { return asyncStub.withDeadlineAfter(getTimeout(), TimeUnit.MILLISECONDS); } @@ -143,47 +135,49 @@ public boolean onStoreUnreachable() { } } - CompletableFuture>[] futureArray = new CompletableFuture[region.getFollowerList().size()]; - int num = 0; - for (Metapb.Peer peer: region.getFollowerList()) { - futureArray[num++] = CompletableFuture.supplyAsync(() -> { - ByteString key = region.getStartKey(); - TiStore store = regionManager.getStoreById(peer.getStoreId()); - ManagedChannel channel = channelFactory.getChannel(store.getAddress(), regionManager.getPDClient().getHostMapping()); - TikvGrpc.TikvBlockingStub stub = getBlockingStub(); - Supplier factory = - () -> Kvrpcpb.RawGetRequest.newBuilder().setContext(makeContext(TiStoreType.TiKV)).setKey(key).build(); - Callable callable = () -> ClientCalls.blockingUnaryCall(channel, TikvGrpc.getRawGetMethod(), stub.getCallOptions(), factory.get()); - - try { - Kvrpcpb.RawGetResponse resp = callable.call(); - if (resp.hasRegionError()) { - Errorpb.Error error = resp.getRegionError(); - if (error.hasNotLeader()) { - return Pair.create(peer, error.getNotLeader().getLeader()); - } - } - } catch (Exception e) { - // ignore exception - try { - Thread.sleep(50); - } catch (InterruptedException ex) { - ex.printStackTrace(); - } - } - return null; - }, Executors.newFixedThreadPool(region.getFollowerList().size())); + List peers = region.getFollowerList(); + if (peers.isEmpty()) { + // no followers available, retry + regionManager.onRequestFail(region); + return false; + } + + Metapb.Peer peer = switchLeader(); + if (peer == null) { + // leader is not elected, just wait until it is ready. + return true; + } + TiStore currentLeaderStore = regionManager.getStoreById(peer.getStoreId()); + if (currentLeaderStore.isReachable()) { + // switch to leader store + store = currentLeaderStore; + updateClientStub(); + return true; + } + if (conf.getEnableGrpcForward()) { + // when current leader cannot be reached + TiStore storeWithProxy = switchProxyStore(); + if (storeWithProxy == null) { + // no store available, retry + regionManager.onRequestFail(region); + return false; + } + if (storeWithProxy.getStore().getId() == store.getStore().getId()) { + // the store is back online + return true; + } + // use proxy store to forward requests + regionManager.updateStore(store, storeWithProxy); + store = storeWithProxy; + updateClientStub(); + return true; } - CompletableFuture.anyOf(futureArray).thenAccept((pair) ->{ - switchProxyStore(pair.first, pair.second); - }); - // If this store has failed to forward request too many times, we shall try other peer at first + // If this store has failed to forward request, we shall try other peer at first // so that we can reduce the latency cost by fail requests. logger.warn( String.format( - "retry time exceed for region[%d], invalid store[%d]", - region.getId(), store.getId())); + "retry time exceed for region[%d], invalid store[%d]", region.getId(), store.getId())); regionManager.onRequestFail(region); return false; } @@ -196,17 +190,6 @@ protected Kvrpcpb.Context makeContext(Set resolvedLocks, TiStoreType store return region.getReplicaContext(resolvedLocks, storeType); } - @Override - public void switchLeaderOrForwardRequest() { - if (leader != null) { - logger.warn( - String.format( - "update leader to store [%d] for region[%d]", - region.getLeader().getStoreId(), region.getId())); - this.regionManager.updateLeader(region, leader.getStoreId()); - } - } - private void updateClientStub() { String addressStr = store.getStore().getAddress(); if (store.getProxyStore() != null) { @@ -215,7 +198,7 @@ private void updateClientStub() { ManagedChannel channel = channelFactory.getChannel(addressStr, regionManager.getPDClient().getHostMapping()); blockingStub = TikvGrpc.newBlockingStub(channel); - asyncStub = TikvGrpc.newStub(channel); + asyncStub = TikvGrpc.newFutureStub(channel); if (store.getProxyStore() != null) { Metadata header = new Metadata(); header.put(TiConfiguration.FORWARD_META_DATA_KEY, store.getStore().getAddress()); @@ -224,19 +207,125 @@ private void updateClientStub() { } } - private TiStore switchProxyStore(Metapb.Peer answered, Metapb.Peer leader) { - List peers = region.getFollowerList(); - if (peers.isEmpty()) { - return null; + private Metapb.Peer switchLeader() { + List responses = new LinkedList<>(); + for (Metapb.Peer peer : region.getFollowerList()) { + ByteString key = region.getStartKey(); + TiStore store = regionManager.getStoreById(peer.getStoreId()); + ManagedChannel channel = + channelFactory.getChannel( + store.getAddress(), regionManager.getPDClient().getHostMapping()); + TikvGrpc.TikvFutureStub stub = getAsyncStub(); + Kvrpcpb.RawGetRequest rawGetRequest = + Kvrpcpb.RawGetRequest.newBuilder() + .setContext(makeContext(TiStoreType.TiKV)) + .setKey(key) + .build(); + ListenableFuture task = + ClientCalls.futureUnaryCall( + channel.newCall(TikvGrpc.getRawGetMethod(), stub.getCallOptions()), rawGetRequest); + responses.add(new SwitchLeaderTask(task, peer)); } - for(Metapb.Peer peer: peers) { + while (true) { + try { + Thread.sleep(20); + } catch (InterruptedException e) { + throw new GrpcException(e); + } + List unfinished = new LinkedList<>(); + for (SwitchLeaderTask task : responses) { + if (task.task.isDone()) { + try { + Kvrpcpb.RawGetResponse resp = task.task.get(); + if (resp != null) { + // the peer has answered, it should be reachable. + TiStore peerStore = regionManager.getStoreById(task.peer.getStoreId()); + if (!peerStore.isReachable()) { + peerStore.markReachable(); + } + if (!resp.hasRegionError()) { + // the peer is leader + return task.peer; + } else { + Errorpb.Error error = resp.getRegionError(); + if (error.hasNotLeader()) { + // real leader in not_leader error + return error.getNotLeader().getLeader(); + } + } + } + } catch (Exception ignored) { + } + } else { + unfinished.add(task); + } + } + if (unfinished.isEmpty()) { + return null; + } + responses = unfinished; + } + } + + private TiStore switchProxyStore() { + List responses = new LinkedList<>(); + for (Metapb.Peer peer : region.getFollowerList()) { TiStore store = regionManager.getStoreById(peer.getStoreId()); - ManagedChannel channel = channelFactory.getChannel(store.getAddress(), regionManager.getPDClient().getHostMapping()); - HealthGrpc.HealthFutureStub stub = HealthGrpc.newFutureStub(channel).withDeadlineAfter(timeout, TimeUnit.MILLISECONDS); + ManagedChannel channel = + channelFactory.getChannel( + store.getAddress(), regionManager.getPDClient().getHostMapping()); + HealthGrpc.HealthFutureStub stub = + HealthGrpc.newFutureStub(channel).withDeadlineAfter(timeout, TimeUnit.MILLISECONDS); Metadata header = new Metadata(); header.put(TiConfiguration.FORWARD_META_DATA_KEY, store.getStore().getAddress()); HealthCheckRequest req = HealthCheckRequest.newBuilder().build(); ListenableFuture task = stub.check(req); + responses.add(new ForwardCheckTask(task, store)); + } + while (true) { + try { + Thread.sleep(20); + } catch (InterruptedException e) { + throw new GrpcException(e); + } + List unfinished = new LinkedList<>(); + for (ForwardCheckTask task : responses) { + if (task.task.isDone()) { + try { + HealthCheckResponse resp = task.task.get(); + if (resp.getStatus() == HealthCheckResponse.ServingStatus.SERVING) { + return store.withProxy(task.store.getStore()); + } + } catch (Exception ignored) { + } + } else { + unfinished.add(task); + } + } + if (unfinished.isEmpty()) { + return null; + } + responses = unfinished; + } + } + + private static class SwitchLeaderTask { + private final ListenableFuture task; + private final Metapb.Peer peer; + + private SwitchLeaderTask(ListenableFuture task, Metapb.Peer peer) { + this.task = task; + this.peer = peer; + } + } + + private static class ForwardCheckTask { + private final ListenableFuture task; + private final TiStore store; + + private ForwardCheckTask(ListenableFuture task, TiStore store) { + this.task = task; + this.store = store; } } } diff --git a/src/main/java/org/tikv/common/region/RegionErrorReceiver.java b/src/main/java/org/tikv/common/region/RegionErrorReceiver.java index 7cc90d3711f..4bee1356eab 100644 --- a/src/main/java/org/tikv/common/region/RegionErrorReceiver.java +++ b/src/main/java/org/tikv/common/region/RegionErrorReceiver.java @@ -17,15 +17,11 @@ package org.tikv.common.region; -import org.tikv.common.TiSession; - public interface RegionErrorReceiver { boolean onNotLeader(TiRegion region); /// return whether we need to retry this request. boolean onStoreUnreachable(); - void switchLeaderOrForwardRequest(); - TiRegion getRegion(); } diff --git a/src/main/java/org/tikv/common/region/RegionStoreClient.java b/src/main/java/org/tikv/common/region/RegionStoreClient.java index eefe978847c..c31ea5345a6 100644 --- a/src/main/java/org/tikv/common/region/RegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/RegionStoreClient.java @@ -30,7 +30,6 @@ import io.grpc.stub.MetadataUtils; import io.prometheus.client.Histogram; import java.util.*; -import java.util.concurrent.Future; import java.util.function.Supplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,7 +48,7 @@ import org.tikv.kvproto.Metapb; import org.tikv.kvproto.TikvGrpc; import org.tikv.kvproto.TikvGrpc.TikvBlockingStub; -import org.tikv.kvproto.TikvGrpc.TikvStub; +import org.tikv.kvproto.TikvGrpc.TikvFutureStub; import org.tikv.txn.AbstractLockResolverClient; import org.tikv.txn.Lock; import org.tikv.txn.ResolveLockResult; @@ -94,7 +93,7 @@ private RegionStoreClient( TiStoreType storeType, ChannelFactory channelFactory, TikvBlockingStub blockingStub, - TikvStub asyncStub, + TikvFutureStub asyncStub, RegionManager regionManager, PDClient pdClient, RegionStoreClient.RegionStoreClientBuilder clientBuilder) { @@ -125,7 +124,7 @@ private RegionStoreClient( ManagedChannel channel = channelFactory.getChannel(addressStr, pdClient.getHostMapping()); TikvBlockingStub tikvBlockingStub = TikvGrpc.newBlockingStub(channel); - TikvStub tikvAsyncStub = TikvGrpc.newStub(channel); + TikvGrpc.TikvFutureStub tikvAsyncStub = TikvGrpc.newFutureStub(channel); this.lockResolverClient = AbstractLockResolverClient.getInstance( @@ -1247,7 +1246,7 @@ public RegionStoreClient build(TiRegion region, TiStore store, TiStoreType store ManagedChannel channel = null; TikvBlockingStub blockingStub = null; - TikvStub asyncStub = null; + TikvFutureStub asyncStub = null; if (conf.getEnableGrpcForward() && store.getProxyStore() != null && !store.isReachable()) { addressStr = store.getProxyStore().getAddress(); @@ -1256,11 +1255,11 @@ public RegionStoreClient build(TiRegion region, TiStore store, TiStoreType store Metadata header = new Metadata(); header.put(TiConfiguration.FORWARD_META_DATA_KEY, store.getStore().getAddress()); blockingStub = MetadataUtils.attachHeaders(TikvGrpc.newBlockingStub(channel), header); - asyncStub = MetadataUtils.attachHeaders(TikvGrpc.newStub(channel), header); + asyncStub = MetadataUtils.attachHeaders(TikvGrpc.newFutureStub(channel), header); } else { channel = channelFactory.getChannel(addressStr, pdClient.getHostMapping()); blockingStub = TikvGrpc.newBlockingStub(channel); - asyncStub = TikvGrpc.newStub(channel); + asyncStub = TikvGrpc.newFutureStub(channel); } return new RegionStoreClient( diff --git a/src/main/java/org/tikv/common/region/TiStore.java b/src/main/java/org/tikv/common/region/TiStore.java index 007856f7f44..bafd118f6d7 100644 --- a/src/main/java/org/tikv/common/region/TiStore.java +++ b/src/main/java/org/tikv/common/region/TiStore.java @@ -1,7 +1,6 @@ package org.tikv.common.region; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; import org.tikv.kvproto.Metapb; public class TiStore { diff --git a/src/main/java/org/tikv/txn/AbstractLockResolverClient.java b/src/main/java/org/tikv/txn/AbstractLockResolverClient.java index 020068e272a..7b4ed1c2125 100644 --- a/src/main/java/org/tikv/txn/AbstractLockResolverClient.java +++ b/src/main/java/org/tikv/txn/AbstractLockResolverClient.java @@ -71,7 +71,7 @@ static AbstractLockResolverClient getInstance( TiRegion region, TiStore store, TikvGrpc.TikvBlockingStub blockingStub, - TikvGrpc.TikvStub asyncStub, + TikvGrpc.TikvFutureStub asyncStub, ChannelFactory channelFactory, RegionManager regionManager, PDClient pdClient, diff --git a/src/main/java/org/tikv/txn/LockResolverClientV2.java b/src/main/java/org/tikv/txn/LockResolverClientV2.java index 3df5966abda..20033090406 100644 --- a/src/main/java/org/tikv/txn/LockResolverClientV2.java +++ b/src/main/java/org/tikv/txn/LockResolverClientV2.java @@ -52,7 +52,7 @@ import org.tikv.kvproto.Kvrpcpb.ResolveLockResponse; import org.tikv.kvproto.TikvGrpc; import org.tikv.kvproto.TikvGrpc.TikvBlockingStub; -import org.tikv.kvproto.TikvGrpc.TikvStub; +import org.tikv.kvproto.TikvGrpc.TikvFutureStub; /** Before v3.0.5 TiDB uses the ttl on secondary lock. */ public class LockResolverClientV2 extends AbstractRegionStoreClient @@ -77,7 +77,7 @@ public LockResolverClientV2( TiRegion region, TiStore store, TikvBlockingStub blockingStub, - TikvStub asyncStub, + TikvFutureStub asyncStub, ChannelFactory channelFactory, RegionManager regionManager) { super(conf, region, store, channelFactory, blockingStub, asyncStub, regionManager); diff --git a/src/main/java/org/tikv/txn/LockResolverClientV3.java b/src/main/java/org/tikv/txn/LockResolverClientV3.java index 0b8d3c89a8c..b15fa784376 100644 --- a/src/main/java/org/tikv/txn/LockResolverClientV3.java +++ b/src/main/java/org/tikv/txn/LockResolverClientV3.java @@ -49,7 +49,7 @@ import org.tikv.kvproto.Kvrpcpb.CleanupResponse; import org.tikv.kvproto.TikvGrpc; import org.tikv.kvproto.TikvGrpc.TikvBlockingStub; -import org.tikv.kvproto.TikvGrpc.TikvStub; +import org.tikv.kvproto.TikvGrpc.TikvFutureStub; /** Since v3.0.5 TiDB ignores the ttl on secondary lock and will use the ttl on primary key. */ public class LockResolverClientV3 extends AbstractRegionStoreClient @@ -78,7 +78,7 @@ public LockResolverClientV3( TiRegion region, TiStore store, TikvBlockingStub blockingStub, - TikvStub asyncStub, + TikvFutureStub asyncStub, ChannelFactory channelFactory, RegionManager regionManager, PDClient pdClient, diff --git a/src/main/java/org/tikv/txn/LockResolverClientV4.java b/src/main/java/org/tikv/txn/LockResolverClientV4.java index 07a5552f0f7..6acc51313c9 100644 --- a/src/main/java/org/tikv/txn/LockResolverClientV4.java +++ b/src/main/java/org/tikv/txn/LockResolverClientV4.java @@ -47,7 +47,7 @@ import org.tikv.kvproto.Kvrpcpb; import org.tikv.kvproto.TikvGrpc; import org.tikv.kvproto.TikvGrpc.TikvBlockingStub; -import org.tikv.kvproto.TikvGrpc.TikvStub; +import org.tikv.kvproto.TikvGrpc.TikvFutureStub; import org.tikv.txn.exception.TxnNotFoundException; import org.tikv.txn.exception.WriteConflictException; @@ -78,7 +78,7 @@ public LockResolverClientV4( TiRegion region, TiStore store, TikvBlockingStub blockingStub, - TikvStub asyncStub, + TikvFutureStub asyncStub, ChannelFactory channelFactory, RegionManager regionManager, PDClient pdClient, From 7176a9a5a3744d6b2ff5e7bdd0145625b6d32ddd Mon Sep 17 00:00:00 2001 From: birdstorm Date: Sun, 14 Nov 2021 17:30:51 +0800 Subject: [PATCH 04/21] update Signed-off-by: birdstorm --- .../region/AbstractRegionStoreClient.java | 25 +++---------------- .../common/region/StoreHealthyChecker.java | 3 --- .../java/org/tikv/common/region/TiStore.java | 11 -------- 3 files changed, 4 insertions(+), 35 deletions(-) diff --git a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java index fcf6bdd1a93..12c370cbea0 100644 --- a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java @@ -27,7 +27,6 @@ import io.grpc.health.v1.HealthCheckRequest; import io.grpc.health.v1.HealthCheckResponse; import io.grpc.health.v1.HealthGrpc; -import io.grpc.stub.ClientCalls; import io.grpc.stub.MetadataUtils; import java.util.LinkedList; import java.util.List; @@ -39,7 +38,6 @@ import org.tikv.common.TiConfiguration; import org.tikv.common.exception.GrpcException; import org.tikv.common.util.ChannelFactory; -import org.tikv.kvproto.Errorpb; import org.tikv.kvproto.Kvrpcpb; import org.tikv.kvproto.Metapb; import org.tikv.kvproto.TikvGrpc; @@ -70,7 +68,7 @@ protected AbstractRegionStoreClient( this.store = store; if (this.store.getProxyStore() != null) { this.timeout = conf.getForwardTimeout(); - } else if (!this.store.isReachable() && !this.store.canForward()) { + } else if (!this.store.isReachable()) { onStoreUnreachable(); } } @@ -159,7 +157,6 @@ public boolean onStoreUnreachable() { TiStore storeWithProxy = switchProxyStore(); if (storeWithProxy == null) { // no store available, retry - regionManager.onRequestFail(region); return false; } if (storeWithProxy.getStore().getId() == store.getStore().getId()) { @@ -172,13 +169,6 @@ public boolean onStoreUnreachable() { updateClientStub(); return true; } - - // If this store has failed to forward request, we shall try other peer at first - // so that we can reduce the latency cost by fail requests. - logger.warn( - String.format( - "retry time exceed for region[%d], invalid store[%d]", region.getId(), store.getId())); - regionManager.onRequestFail(region); return false; } @@ -215,15 +205,14 @@ private Metapb.Peer switchLeader() { ManagedChannel channel = channelFactory.getChannel( store.getAddress(), regionManager.getPDClient().getHostMapping()); - TikvGrpc.TikvFutureStub stub = getAsyncStub(); + TikvGrpc.TikvFutureStub stub = + TikvGrpc.newFutureStub(channel).withDeadlineAfter(timeout, TimeUnit.MILLISECONDS); Kvrpcpb.RawGetRequest rawGetRequest = Kvrpcpb.RawGetRequest.newBuilder() .setContext(makeContext(TiStoreType.TiKV)) .setKey(key) .build(); - ListenableFuture task = - ClientCalls.futureUnaryCall( - channel.newCall(TikvGrpc.getRawGetMethod(), stub.getCallOptions()), rawGetRequest); + ListenableFuture task = stub.rawGet(rawGetRequest); responses.add(new SwitchLeaderTask(task, peer)); } while (true) { @@ -246,12 +235,6 @@ private Metapb.Peer switchLeader() { if (!resp.hasRegionError()) { // the peer is leader return task.peer; - } else { - Errorpb.Error error = resp.getRegionError(); - if (error.hasNotLeader()) { - // real leader in not_leader error - return error.getNotLeader().getLeader(); - } } } } catch (Exception ignored) { diff --git a/src/main/java/org/tikv/common/region/StoreHealthyChecker.java b/src/main/java/org/tikv/common/region/StoreHealthyChecker.java index b2bcdbe1fd4..43190718bae 100644 --- a/src/main/java/org/tikv/common/region/StoreHealthyChecker.java +++ b/src/main/java/org/tikv/common/region/StoreHealthyChecker.java @@ -122,9 +122,6 @@ public void run() { "store [%s] recovers to be reachable and canforward", store.getAddress())); store.markReachable(); } - if (!store.canForward()) { - store.markCanForward(); - } } } else if (store.isReachable()) { unreachableStore.add(store); diff --git a/src/main/java/org/tikv/common/region/TiStore.java b/src/main/java/org/tikv/common/region/TiStore.java index bafd118f6d7..dde79975d98 100644 --- a/src/main/java/org/tikv/common/region/TiStore.java +++ b/src/main/java/org/tikv/common/region/TiStore.java @@ -8,13 +8,11 @@ public class TiStore { private final Metapb.Store proxyStore; private final AtomicBoolean reachable; private final AtomicBoolean valid; - private final AtomicBoolean canForward; public TiStore(Metapb.Store store) { this.store = store; this.reachable = new AtomicBoolean(true); this.valid = new AtomicBoolean(true); - this.canForward = new AtomicBoolean(true); this.proxyStore = null; } @@ -26,7 +24,6 @@ private TiStore(Metapb.Store store, Metapb.Store proxyStore) { this.reachable = new AtomicBoolean(true); } this.valid = new AtomicBoolean(true); - this.canForward = new AtomicBoolean(true); this.proxyStore = proxyStore; } @@ -76,14 +73,6 @@ public void markInvalid() { this.valid.set(false); } - public void markCanForward() { - this.canForward.set(true); - } - - public boolean canForward() { - return this.canForward.get(); - } - public Metapb.Store getStore() { return this.store; } From a5892b0eab70e472ac9414340c09b18dc31b8687 Mon Sep 17 00:00:00 2001 From: birdstorm Date: Sun, 14 Nov 2021 17:39:28 +0800 Subject: [PATCH 05/21] modify Signed-off-by: birdstorm --- .../org/tikv/common/region/AbstractRegionStoreClient.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java index 12c370cbea0..d9e51cc7a41 100644 --- a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java @@ -147,6 +147,8 @@ public boolean onStoreUnreachable() { } TiStore currentLeaderStore = regionManager.getStoreById(peer.getStoreId()); if (currentLeaderStore.isReachable()) { + // update region cache + regionManager.updateLeader(region, peer.getStoreId()); // switch to leader store store = currentLeaderStore; updateClientStub(); @@ -159,10 +161,6 @@ public boolean onStoreUnreachable() { // no store available, retry return false; } - if (storeWithProxy.getStore().getId() == store.getStore().getId()) { - // the store is back online - return true; - } // use proxy store to forward requests regionManager.updateStore(store, storeWithProxy); store = storeWithProxy; From 7328570fd9890a572c582a17864fcfa6423b08d8 Mon Sep 17 00:00:00 2001 From: birdstorm Date: Sun, 14 Nov 2021 17:47:58 +0800 Subject: [PATCH 06/21] remove mark reachable Signed-off-by: birdstorm --- .../org/tikv/common/region/AbstractRegionStoreClient.java | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java index d9e51cc7a41..602ad3748ce 100644 --- a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java @@ -148,7 +148,7 @@ public boolean onStoreUnreachable() { TiStore currentLeaderStore = regionManager.getStoreById(peer.getStoreId()); if (currentLeaderStore.isReachable()) { // update region cache - regionManager.updateLeader(region, peer.getStoreId()); + region = regionManager.updateLeader(region, peer.getStoreId()); // switch to leader store store = currentLeaderStore; updateClientStub(); @@ -225,11 +225,6 @@ private Metapb.Peer switchLeader() { try { Kvrpcpb.RawGetResponse resp = task.task.get(); if (resp != null) { - // the peer has answered, it should be reachable. - TiStore peerStore = regionManager.getStoreById(task.peer.getStoreId()); - if (!peerStore.isReachable()) { - peerStore.markReachable(); - } if (!resp.hasRegionError()) { // the peer is leader return task.peer; From 59a2256beaf160a8564493e510d560b2c77a217b Mon Sep 17 00:00:00 2001 From: birdstorm Date: Sun, 14 Nov 2021 17:57:42 +0800 Subject: [PATCH 07/21] fix region peer in context Signed-off-by: birdstorm --- .../org/tikv/common/region/AbstractRegionStoreClient.java | 2 +- src/main/java/org/tikv/common/region/TiRegion.java | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java index 602ad3748ce..fb782f26c31 100644 --- a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java @@ -207,7 +207,7 @@ private Metapb.Peer switchLeader() { TikvGrpc.newFutureStub(channel).withDeadlineAfter(timeout, TimeUnit.MILLISECONDS); Kvrpcpb.RawGetRequest rawGetRequest = Kvrpcpb.RawGetRequest.newBuilder() - .setContext(makeContext(TiStoreType.TiKV)) + .setContext(region.getReplicaContext(peer)) .setKey(key) .build(); ListenableFuture task = stub.rawGet(rawGetRequest); diff --git a/src/main/java/org/tikv/common/region/TiRegion.java b/src/main/java/org/tikv/common/region/TiRegion.java index 0bb934e4251..c02124126de 100644 --- a/src/main/java/org/tikv/common/region/TiRegion.java +++ b/src/main/java/org/tikv/common/region/TiRegion.java @@ -159,6 +159,10 @@ public Kvrpcpb.Context getReplicaContext(Peer currentPeer, Set resolvedLoc return getContext(currentPeer, resolvedLocks, false); } + public Kvrpcpb.Context getReplicaContext(Peer currentPeer) { + return getContext(currentPeer, java.util.Collections.emptySet(), false); + } + private Kvrpcpb.Context getContext( Peer currentPeer, Set resolvedLocks, boolean replicaRead) { From 0021e50c68a8dc0a90e0d50b7a7e909bcdebede4 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Sun, 14 Nov 2021 21:36:43 +0800 Subject: [PATCH 08/21] update sleep time Signed-off-by: Little-Wallace --- .../org/tikv/common/region/AbstractRegionStoreClient.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java index fb782f26c31..4b791c47c2a 100644 --- a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java @@ -215,7 +215,7 @@ private Metapb.Peer switchLeader() { } while (true) { try { - Thread.sleep(20); + Thread.sleep(2); } catch (InterruptedException e) { throw new GrpcException(e); } @@ -260,7 +260,7 @@ private TiStore switchProxyStore() { } while (true) { try { - Thread.sleep(20); + Thread.sleep(2); } catch (InterruptedException e) { throw new GrpcException(e); } From 2f3255aa78b4cdc35d6afb90d3084a4b3178dfd0 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Sun, 14 Nov 2021 22:09:35 +0800 Subject: [PATCH 09/21] reduce wait backoff Signed-off-by: Little-Wallace --- src/main/java/org/tikv/common/util/ConcreteBackOffer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/tikv/common/util/ConcreteBackOffer.java b/src/main/java/org/tikv/common/util/ConcreteBackOffer.java index 173109a1cbc..141bb5ed76e 100644 --- a/src/main/java/org/tikv/common/util/ConcreteBackOffer.java +++ b/src/main/java/org/tikv/common/util/ConcreteBackOffer.java @@ -131,7 +131,7 @@ private BackOffFunction createBackOffFunc(BackOffFunction.BackOffFuncType funcTy backOffFunction = BackOffFunction.create(100, 600, BackOffStrategy.EqualJitter); break; case BoTiKVRPC: - backOffFunction = BackOffFunction.create(100, 400, BackOffStrategy.EqualJitter); + backOffFunction = BackOffFunction.create(10, 400, BackOffStrategy.EqualJitter); break; case BoTxnNotFound: backOffFunction = BackOffFunction.create(2, 500, BackOffStrategy.NoJitter); From fbe147194f767f32516afcf39163e02324375282 Mon Sep 17 00:00:00 2001 From: birdstorm Date: Sun, 14 Nov 2021 23:27:53 +0800 Subject: [PATCH 10/21] add loggers Signed-off-by: birdstorm --- .../region/AbstractRegionStoreClient.java | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java index 4b791c47c2a..34c4db5bdd4 100644 --- a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java @@ -129,6 +129,9 @@ public boolean onStoreUnreachable() { if (store.getProxyStore() == null) { if (store.isReachable()) { + logger.warn( + String.format( + "store[%d] for region[%d] is reachable, retry", store.getId(), region.getId())); return true; } } @@ -136,17 +139,27 @@ public boolean onStoreUnreachable() { List peers = region.getFollowerList(); if (peers.isEmpty()) { // no followers available, retry + logger.warn(String.format("no followers of region[%d] available, retry", region.getId())); regionManager.onRequestFail(region); return false; } + logger.warn(String.format("try switch leader: region[%d]", region.getId())); + Metapb.Peer peer = switchLeader(); if (peer == null) { // leader is not elected, just wait until it is ready. + logger.warn( + String.format( + "leader for region[%d] is not elected, just wait until it is ready", region.getId())); return true; } TiStore currentLeaderStore = regionManager.getStoreById(peer.getStoreId()); if (currentLeaderStore.isReachable()) { + logger.warn( + String.format( + "update leader using switchLeader logic from store[%d] to store[%d]", + region.getLeader().getStoreId(), peer.getStoreId())); // update region cache region = regionManager.updateLeader(region, peer.getStoreId()); // switch to leader store @@ -155,10 +168,12 @@ public boolean onStoreUnreachable() { return true; } if (conf.getEnableGrpcForward()) { + logger.warn(String.format("try grpc forward: region[%d]", region.getId())); // when current leader cannot be reached TiStore storeWithProxy = switchProxyStore(); if (storeWithProxy == null) { // no store available, retry + logger.warn(String.format("No store available, retry: region[%d]", region.getId())); return false; } // use proxy store to forward requests @@ -225,8 +240,12 @@ private Metapb.Peer switchLeader() { try { Kvrpcpb.RawGetResponse resp = task.task.get(); if (resp != null) { + logger.info(String.format("rawGet response received from peer[%s]", task.peer)); if (!resp.hasRegionError()) { // the peer is leader + logger.info( + String.format( + "rawGet response indicates peer[%d] is leader", task.peer.getId())); return task.peer; } } @@ -269,7 +288,13 @@ private TiStore switchProxyStore() { if (task.task.isDone()) { try { HealthCheckResponse resp = task.task.get(); + logger.info( + String.format("healthCheck response received from store[%d]", task.store.getId())); if (resp.getStatus() == HealthCheckResponse.ServingStatus.SERVING) { + logger.info( + String.format( + "healthCheck response indicates forward from remote[%s] to remote[%s]", + task.store.getAddress(), store.getAddress())); return store.withProxy(task.store.getStore()); } } catch (Exception ignored) { From 3c6814ba62f04045010f44a3a9c970da35120174 Mon Sep 17 00:00:00 2001 From: birdstorm Date: Sun, 14 Nov 2021 23:55:39 +0800 Subject: [PATCH 11/21] fix grpc forward check Signed-off-by: birdstorm --- .../region/AbstractRegionStoreClient.java | 61 ++++++++++++------- 1 file changed, 39 insertions(+), 22 deletions(-) diff --git a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java index 34c4db5bdd4..4606a3b6273 100644 --- a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java @@ -38,6 +38,7 @@ import org.tikv.common.TiConfiguration; import org.tikv.common.exception.GrpcException; import org.tikv.common.util.ChannelFactory; +import org.tikv.common.util.Pair; import org.tikv.kvproto.Kvrpcpb; import org.tikv.kvproto.Metapb; import org.tikv.kvproto.TikvGrpc; @@ -146,26 +147,39 @@ public boolean onStoreUnreachable() { logger.warn(String.format("try switch leader: region[%d]", region.getId())); - Metapb.Peer peer = switchLeader(); + Pair pair = switchLeader(); + Metapb.Peer peer = pair.first; + boolean exceptionEncountered = pair.second; if (peer == null) { - // leader is not elected, just wait until it is ready. - logger.warn( - String.format( - "leader for region[%d] is not elected, just wait until it is ready", region.getId())); - return true; - } - TiStore currentLeaderStore = regionManager.getStoreById(peer.getStoreId()); - if (currentLeaderStore.isReachable()) { - logger.warn( - String.format( - "update leader using switchLeader logic from store[%d] to store[%d]", - region.getLeader().getStoreId(), peer.getStoreId())); - // update region cache - region = regionManager.updateLeader(region, peer.getStoreId()); - // switch to leader store - store = currentLeaderStore; - updateClientStub(); - return true; + if (!exceptionEncountered) { + // all response returned normally, the leader is not elected, just wait until it is ready. + logger.warn( + String.format( + "leader for region[%d] is not elected, just wait until it is ready", + region.getId())); + return true; + } else { + // no leader found, some response does not return normally, there may be network partition. + logger.warn( + String.format( + "leader for region[%d] is not found, it is possible that network partition occurred", + region.getId())); + } + } else { + // we found a leader + TiStore currentLeaderStore = regionManager.getStoreById(peer.getStoreId()); + if (currentLeaderStore.isReachable()) { + logger.warn( + String.format( + "update leader using switchLeader logic from store[%d] to store[%d]", + region.getLeader().getStoreId(), peer.getStoreId())); + // update region cache + region = regionManager.updateLeader(region, peer.getStoreId()); + // switch to leader store + store = currentLeaderStore; + updateClientStub(); + return true; + } } if (conf.getEnableGrpcForward()) { logger.warn(String.format("try grpc forward: region[%d]", region.getId())); @@ -210,7 +224,8 @@ private void updateClientStub() { } } - private Metapb.Peer switchLeader() { + // first: leader peer, second: true if any responses returned with grpc error + private Pair switchLeader() { List responses = new LinkedList<>(); for (Metapb.Peer peer : region.getFollowerList()) { ByteString key = region.getStartKey(); @@ -228,6 +243,7 @@ private Metapb.Peer switchLeader() { ListenableFuture task = stub.rawGet(rawGetRequest); responses.add(new SwitchLeaderTask(task, peer)); } + boolean exceptionEncountered = false; while (true) { try { Thread.sleep(2); @@ -246,17 +262,18 @@ private Metapb.Peer switchLeader() { logger.info( String.format( "rawGet response indicates peer[%d] is leader", task.peer.getId())); - return task.peer; + return Pair.create(task.peer, exceptionEncountered); } } } catch (Exception ignored) { + exceptionEncountered = true; } } else { unfinished.add(task); } } if (unfinished.isEmpty()) { - return null; + return Pair.create(null, exceptionEncountered); } responses = unfinished; } From d145cb872eadad9aa79db6a58ed0989262e20820 Mon Sep 17 00:00:00 2001 From: birdstorm Date: Mon, 15 Nov 2021 00:54:16 +0800 Subject: [PATCH 12/21] add metrics Signed-off-by: birdstorm --- .../region/AbstractRegionStoreClient.java | 129 ++++++++++-------- 1 file changed, 75 insertions(+), 54 deletions(-) diff --git a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java index 4606a3b6273..91d637dd79c 100644 --- a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java @@ -28,6 +28,7 @@ import io.grpc.health.v1.HealthCheckResponse; import io.grpc.health.v1.HealthGrpc; import io.grpc.stub.MetadataUtils; +import io.prometheus.client.Histogram; import java.util.LinkedList; import java.util.List; import java.util.Set; @@ -48,6 +49,18 @@ public abstract class AbstractRegionStoreClient implements RegionErrorReceiver { private static final Logger logger = LoggerFactory.getLogger(AbstractRegionStoreClient.class); + public static final Histogram SWITCH_LEADER_DURATION = + Histogram.build() + .name("client_java_switch_leader_duration") + .help("switch leader duration.") + .register(); + + public static final Histogram GRPC_FORWARD_DURATION = + Histogram.build() + .name("client_java_grpc_forward_duration") + .help("grpc forward duration.") + .register(); + protected final RegionManager regionManager; protected TiRegion region; protected TiStore store; @@ -137,64 +150,75 @@ public boolean onStoreUnreachable() { } } - List peers = region.getFollowerList(); - if (peers.isEmpty()) { - // no followers available, retry - logger.warn(String.format("no followers of region[%d] available, retry", region.getId())); - regionManager.onRequestFail(region); - return false; - } + Histogram.Timer switchLeaderDurationTimer = SWITCH_LEADER_DURATION.startTimer(); + try { + List peers = region.getFollowerList(); + if (peers.isEmpty()) { + // no followers available, retry + logger.warn(String.format("no followers of region[%d] available, retry", region.getId())); + regionManager.onRequestFail(region); + return false; + } - logger.warn(String.format("try switch leader: region[%d]", region.getId())); + logger.warn(String.format("try switch leader: region[%d]", region.getId())); - Pair pair = switchLeader(); - Metapb.Peer peer = pair.first; - boolean exceptionEncountered = pair.second; - if (peer == null) { - if (!exceptionEncountered) { - // all response returned normally, the leader is not elected, just wait until it is ready. - logger.warn( - String.format( - "leader for region[%d] is not elected, just wait until it is ready", - region.getId())); - return true; + Pair pair = switchLeader(); + Metapb.Peer peer = pair.first; + boolean exceptionEncountered = pair.second; + if (peer == null) { + if (!exceptionEncountered) { + // all response returned normally, the leader is not elected, just wait until it is ready. + logger.warn( + String.format( + "leader for region[%d] is not elected, just wait until it is ready", + region.getId())); + return true; + } else { + // no leader found, some response does not return normally, there may be network + // partition. + logger.warn( + String.format( + "leader for region[%d] is not found, it is possible that network partition occurred", + region.getId())); + } } else { - // no leader found, some response does not return normally, there may be network partition. - logger.warn( - String.format( - "leader for region[%d] is not found, it is possible that network partition occurred", - region.getId())); - } - } else { - // we found a leader - TiStore currentLeaderStore = regionManager.getStoreById(peer.getStoreId()); - if (currentLeaderStore.isReachable()) { - logger.warn( - String.format( - "update leader using switchLeader logic from store[%d] to store[%d]", - region.getLeader().getStoreId(), peer.getStoreId())); - // update region cache - region = regionManager.updateLeader(region, peer.getStoreId()); - // switch to leader store - store = currentLeaderStore; - updateClientStub(); - return true; + // we found a leader + TiStore currentLeaderStore = regionManager.getStoreById(peer.getStoreId()); + if (currentLeaderStore.isReachable()) { + logger.warn( + String.format( + "update leader using switchLeader logic from store[%d] to store[%d]", + region.getLeader().getStoreId(), peer.getStoreId())); + // update region cache + region = regionManager.updateLeader(region, peer.getStoreId()); + // switch to leader store + store = currentLeaderStore; + updateClientStub(); + return true; + } } + } finally { + switchLeaderDurationTimer.observeDuration(); } if (conf.getEnableGrpcForward()) { - logger.warn(String.format("try grpc forward: region[%d]", region.getId())); - // when current leader cannot be reached - TiStore storeWithProxy = switchProxyStore(); - if (storeWithProxy == null) { - // no store available, retry - logger.warn(String.format("No store available, retry: region[%d]", region.getId())); - return false; + Histogram.Timer grpcForwardDurationTimer = GRPC_FORWARD_DURATION.startTimer(); + try { + logger.warn(String.format("try grpc forward: region[%d]", region.getId())); + // when current leader cannot be reached + TiStore storeWithProxy = switchProxyStore(); + if (storeWithProxy == null) { + // no store available, retry + logger.warn(String.format("No store available, retry: region[%d]", region.getId())); + return false; + } + // use proxy store to forward requests + regionManager.updateStore(store, storeWithProxy); + store = storeWithProxy; + updateClientStub(); + return true; + } finally { + grpcForwardDurationTimer.observeDuration(); } - // use proxy store to forward requests - regionManager.updateStore(store, storeWithProxy); - store = storeWithProxy; - updateClientStub(); - return true; } return false; } @@ -256,7 +280,6 @@ private Pair switchLeader() { try { Kvrpcpb.RawGetResponse resp = task.task.get(); if (resp != null) { - logger.info(String.format("rawGet response received from peer[%s]", task.peer)); if (!resp.hasRegionError()) { // the peer is leader logger.info( @@ -305,8 +328,6 @@ private TiStore switchProxyStore() { if (task.task.isDone()) { try { HealthCheckResponse resp = task.task.get(); - logger.info( - String.format("healthCheck response received from store[%d]", task.store.getId())); if (resp.getStatus() == HealthCheckResponse.ServingStatus.SERVING) { logger.info( String.format( From 5dd4d0f655383d3c84aa84201c4a694b784af078 Mon Sep 17 00:00:00 2001 From: birdstorm Date: Mon, 15 Nov 2021 14:53:36 +0800 Subject: [PATCH 13/21] fix grpc forward store Signed-off-by: birdstorm --- .../common/region/AbstractRegionStoreClient.java | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java index 91d637dd79c..de9e32f5e87 100644 --- a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java @@ -233,13 +233,16 @@ protected Kvrpcpb.Context makeContext(Set resolvedLocks, TiStoreType store private void updateClientStub() { String addressStr = store.getStore().getAddress(); + long deadline = timeout; if (store.getProxyStore() != null) { addressStr = store.getProxyStore().getAddress(); + deadline = conf.getForwardTimeout(); } ManagedChannel channel = channelFactory.getChannel(addressStr, regionManager.getPDClient().getHostMapping()); - blockingStub = TikvGrpc.newBlockingStub(channel); - asyncStub = TikvGrpc.newFutureStub(channel); + blockingStub = + TikvGrpc.newBlockingStub(channel).withDeadlineAfter(deadline, TimeUnit.MILLISECONDS); + asyncStub = TikvGrpc.newFutureStub(channel).withDeadlineAfter(deadline, TimeUnit.MILLISECONDS); if (store.getProxyStore() != null) { Metadata header = new Metadata(); header.put(TiConfiguration.FORWARD_META_DATA_KEY, store.getStore().getAddress()); @@ -305,17 +308,17 @@ private Pair switchLeader() { private TiStore switchProxyStore() { List responses = new LinkedList<>(); for (Metapb.Peer peer : region.getFollowerList()) { - TiStore store = regionManager.getStoreById(peer.getStoreId()); + TiStore peerStore = regionManager.getStoreById(peer.getStoreId()); ManagedChannel channel = channelFactory.getChannel( - store.getAddress(), regionManager.getPDClient().getHostMapping()); + peerStore.getAddress(), regionManager.getPDClient().getHostMapping()); HealthGrpc.HealthFutureStub stub = HealthGrpc.newFutureStub(channel).withDeadlineAfter(timeout, TimeUnit.MILLISECONDS); Metadata header = new Metadata(); header.put(TiConfiguration.FORWARD_META_DATA_KEY, store.getStore().getAddress()); HealthCheckRequest req = HealthCheckRequest.newBuilder().build(); ListenableFuture task = stub.check(req); - responses.add(new ForwardCheckTask(task, store)); + responses.add(new ForwardCheckTask(task, peerStore)); } while (true) { try { From 5cee37eaebf871403d3d6340ff45fd25a898c9da Mon Sep 17 00:00:00 2001 From: birdstorm Date: Mon, 15 Nov 2021 18:16:26 +0800 Subject: [PATCH 14/21] use rawget req rather than healthcheck req Signed-off-by: birdstorm --- .../region/AbstractRegionStoreClient.java | 36 +++++++++---------- 1 file changed, 17 insertions(+), 19 deletions(-) diff --git a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java index de9e32f5e87..9d2c86a21b5 100644 --- a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java @@ -24,9 +24,6 @@ import com.google.protobuf.ByteString; import io.grpc.ManagedChannel; import io.grpc.Metadata; -import io.grpc.health.v1.HealthCheckRequest; -import io.grpc.health.v1.HealthCheckResponse; -import io.grpc.health.v1.HealthGrpc; import io.grpc.stub.MetadataUtils; import io.prometheus.client.Histogram; import java.util.LinkedList; @@ -256,10 +253,10 @@ private Pair switchLeader() { List responses = new LinkedList<>(); for (Metapb.Peer peer : region.getFollowerList()) { ByteString key = region.getStartKey(); - TiStore store = regionManager.getStoreById(peer.getStoreId()); + TiStore peerStore = regionManager.getStoreById(peer.getStoreId()); ManagedChannel channel = channelFactory.getChannel( - store.getAddress(), regionManager.getPDClient().getHostMapping()); + peerStore.getAddress(), regionManager.getPDClient().getHostMapping()); TikvGrpc.TikvFutureStub stub = TikvGrpc.newFutureStub(channel).withDeadlineAfter(timeout, TimeUnit.MILLISECONDS); Kvrpcpb.RawGetRequest rawGetRequest = @@ -308,16 +305,22 @@ private Pair switchLeader() { private TiStore switchProxyStore() { List responses = new LinkedList<>(); for (Metapb.Peer peer : region.getFollowerList()) { + ByteString key = region.getStartKey(); TiStore peerStore = regionManager.getStoreById(peer.getStoreId()); ManagedChannel channel = channelFactory.getChannel( peerStore.getAddress(), regionManager.getPDClient().getHostMapping()); - HealthGrpc.HealthFutureStub stub = - HealthGrpc.newFutureStub(channel).withDeadlineAfter(timeout, TimeUnit.MILLISECONDS); + TikvGrpc.TikvFutureStub stub = + TikvGrpc.newFutureStub(channel).withDeadlineAfter(timeout, TimeUnit.MILLISECONDS); Metadata header = new Metadata(); header.put(TiConfiguration.FORWARD_META_DATA_KEY, store.getStore().getAddress()); - HealthCheckRequest req = HealthCheckRequest.newBuilder().build(); - ListenableFuture task = stub.check(req); + Kvrpcpb.RawGetRequest rawGetRequest = + Kvrpcpb.RawGetRequest.newBuilder() + .setContext(region.getReplicaContext(peer)) + .setKey(key) + .build(); + ListenableFuture task = + MetadataUtils.attachHeaders(stub, header).rawGet(rawGetRequest); responses.add(new ForwardCheckTask(task, peerStore)); } while (true) { @@ -330,14 +333,9 @@ private TiStore switchProxyStore() { for (ForwardCheckTask task : responses) { if (task.task.isDone()) { try { - HealthCheckResponse resp = task.task.get(); - if (resp.getStatus() == HealthCheckResponse.ServingStatus.SERVING) { - logger.info( - String.format( - "healthCheck response indicates forward from remote[%s] to remote[%s]", - task.store.getAddress(), store.getAddress())); - return store.withProxy(task.store.getStore()); - } + // any answer will do + Kvrpcpb.RawGetResponse resp = task.task.get(); + return task.store; } catch (Exception ignored) { } } else { @@ -362,10 +360,10 @@ private SwitchLeaderTask(ListenableFuture task, Metapb.P } private static class ForwardCheckTask { - private final ListenableFuture task; + private final ListenableFuture task; private final TiStore store; - private ForwardCheckTask(ListenableFuture task, TiStore store) { + private ForwardCheckTask(ListenableFuture task, TiStore store) { this.task = task; this.store = store; } From 59d36ba93008f91e6624e0f669b6578a22ea5f21 Mon Sep 17 00:00:00 2001 From: birdstorm Date: Mon, 15 Nov 2021 20:00:47 +0800 Subject: [PATCH 15/21] fix forward proxy Signed-off-by: birdstorm --- .../common/region/AbstractRegionStoreClient.java | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java index 9d2c86a21b5..5f6416ea6f9 100644 --- a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java @@ -303,6 +303,7 @@ private Pair switchLeader() { } private TiStore switchProxyStore() { + long forwardTimeout = conf.getForwardTimeout(); List responses = new LinkedList<>(); for (Metapb.Peer peer : region.getFollowerList()) { ByteString key = region.getStartKey(); @@ -311,7 +312,7 @@ private TiStore switchProxyStore() { channelFactory.getChannel( peerStore.getAddress(), regionManager.getPDClient().getHostMapping()); TikvGrpc.TikvFutureStub stub = - TikvGrpc.newFutureStub(channel).withDeadlineAfter(timeout, TimeUnit.MILLISECONDS); + TikvGrpc.newFutureStub(channel).withDeadlineAfter(forwardTimeout, TimeUnit.MILLISECONDS); Metadata header = new Metadata(); header.put(TiConfiguration.FORWARD_META_DATA_KEY, store.getStore().getAddress()); Kvrpcpb.RawGetRequest rawGetRequest = @@ -321,7 +322,7 @@ private TiStore switchProxyStore() { .build(); ListenableFuture task = MetadataUtils.attachHeaders(stub, header).rawGet(rawGetRequest); - responses.add(new ForwardCheckTask(task, peerStore)); + responses.add(new ForwardCheckTask(task, peerStore.getStore())); } while (true) { try { @@ -335,7 +336,11 @@ private TiStore switchProxyStore() { try { // any answer will do Kvrpcpb.RawGetResponse resp = task.task.get(); - return task.store; + logger.info( + String.format( + "rawGetResponse indicates forward from [%s] to [%s]", + task.store.getAddress(), store.getAddress())); + return store.withProxy(task.store); } catch (Exception ignored) { } } else { @@ -361,9 +366,9 @@ private SwitchLeaderTask(ListenableFuture task, Metapb.P private static class ForwardCheckTask { private final ListenableFuture task; - private final TiStore store; + private final Metapb.Store store; - private ForwardCheckTask(ListenableFuture task, TiStore store) { + private ForwardCheckTask(ListenableFuture task, Metapb.Store store) { this.task = task; this.store = store; } From bca1188d110ddd60ce2a0d532e0a3d531b15dc87 Mon Sep 17 00:00:00 2001 From: birdstorm Date: Thu, 18 Nov 2021 16:22:19 +0800 Subject: [PATCH 16/21] modify log level Signed-off-by: birdstorm --- .../tikv/common/region/AbstractRegionStoreClient.java | 10 +++++----- .../org/tikv/common/region/StoreHealthyChecker.java | 4 +--- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java index 5f6416ea6f9..e02e0114d61 100644 --- a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java @@ -140,7 +140,7 @@ public boolean onStoreUnreachable() { if (store.getProxyStore() == null) { if (store.isReachable()) { - logger.warn( + logger.info( String.format( "store[%d] for region[%d] is reachable, retry", store.getId(), region.getId())); return true; @@ -157,7 +157,7 @@ public boolean onStoreUnreachable() { return false; } - logger.warn(String.format("try switch leader: region[%d]", region.getId())); + logger.info(String.format("try switch leader: region[%d]", region.getId())); Pair pair = switchLeader(); Metapb.Peer peer = pair.first; @@ -165,7 +165,7 @@ public boolean onStoreUnreachable() { if (peer == null) { if (!exceptionEncountered) { // all response returned normally, the leader is not elected, just wait until it is ready. - logger.warn( + logger.info( String.format( "leader for region[%d] is not elected, just wait until it is ready", region.getId())); @@ -182,7 +182,7 @@ public boolean onStoreUnreachable() { // we found a leader TiStore currentLeaderStore = regionManager.getStoreById(peer.getStoreId()); if (currentLeaderStore.isReachable()) { - logger.warn( + logger.info( String.format( "update leader using switchLeader logic from store[%d] to store[%d]", region.getLeader().getStoreId(), peer.getStoreId())); @@ -200,7 +200,7 @@ public boolean onStoreUnreachable() { if (conf.getEnableGrpcForward()) { Histogram.Timer grpcForwardDurationTimer = GRPC_FORWARD_DURATION.startTimer(); try { - logger.warn(String.format("try grpc forward: region[%d]", region.getId())); + logger.info(String.format("try grpc forward: region[%d]", region.getId())); // when current leader cannot be reached TiStore storeWithProxy = switchProxyStore(); if (storeWithProxy == null) { diff --git a/src/main/java/org/tikv/common/region/StoreHealthyChecker.java b/src/main/java/org/tikv/common/region/StoreHealthyChecker.java index 43190718bae..546c08a9699 100644 --- a/src/main/java/org/tikv/common/region/StoreHealthyChecker.java +++ b/src/main/java/org/tikv/common/region/StoreHealthyChecker.java @@ -117,9 +117,7 @@ public void run() { } } else { if (!store.isReachable()) { - logger.warn( - String.format( - "store [%s] recovers to be reachable and canforward", store.getAddress())); + logger.warn(String.format("store [%s] recovers to be reachable", store.getAddress())); store.markReachable(); } } From 0e0c4d935fa92c6a7ef1659cd4c5be5e1309299b Mon Sep 17 00:00:00 2001 From: birdstorm Date: Thu, 18 Nov 2021 16:35:14 +0800 Subject: [PATCH 17/21] rename func Signed-off-by: birdstorm --- .../region/AbstractRegionStoreClient.java | 104 ++++++++++-------- 1 file changed, 58 insertions(+), 46 deletions(-) diff --git a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java index e02e0114d61..3cb9df6a951 100644 --- a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java @@ -147,6 +147,45 @@ public boolean onStoreUnreachable() { } } + Boolean result = trySwitchLeader(); + if (result != null) { + return result; + } + if (conf.getEnableGrpcForward()) { + return tryGrpcForward(); + } + return false; + } + + protected Kvrpcpb.Context makeContext(TiStoreType storeType) { + return region.getReplicaContext(java.util.Collections.emptySet(), storeType); + } + + protected Kvrpcpb.Context makeContext(Set resolvedLocks, TiStoreType storeType) { + return region.getReplicaContext(resolvedLocks, storeType); + } + + private void updateClientStub() { + String addressStr = store.getStore().getAddress(); + long deadline = timeout; + if (store.getProxyStore() != null) { + addressStr = store.getProxyStore().getAddress(); + deadline = conf.getForwardTimeout(); + } + ManagedChannel channel = + channelFactory.getChannel(addressStr, regionManager.getPDClient().getHostMapping()); + blockingStub = + TikvGrpc.newBlockingStub(channel).withDeadlineAfter(deadline, TimeUnit.MILLISECONDS); + asyncStub = TikvGrpc.newFutureStub(channel).withDeadlineAfter(deadline, TimeUnit.MILLISECONDS); + if (store.getProxyStore() != null) { + Metadata header = new Metadata(); + header.put(TiConfiguration.FORWARD_META_DATA_KEY, store.getStore().getAddress()); + blockingStub = MetadataUtils.attachHeaders(blockingStub, header); + asyncStub = MetadataUtils.attachHeaders(asyncStub, header); + } + } + + private Boolean trySwitchLeader() { Histogram.Timer switchLeaderDurationTimer = SWITCH_LEADER_DURATION.startTimer(); try { List peers = region.getFollowerList(); @@ -197,54 +236,27 @@ public boolean onStoreUnreachable() { } finally { switchLeaderDurationTimer.observeDuration(); } - if (conf.getEnableGrpcForward()) { - Histogram.Timer grpcForwardDurationTimer = GRPC_FORWARD_DURATION.startTimer(); - try { - logger.info(String.format("try grpc forward: region[%d]", region.getId())); - // when current leader cannot be reached - TiStore storeWithProxy = switchProxyStore(); - if (storeWithProxy == null) { - // no store available, retry - logger.warn(String.format("No store available, retry: region[%d]", region.getId())); - return false; - } - // use proxy store to forward requests - regionManager.updateStore(store, storeWithProxy); - store = storeWithProxy; - updateClientStub(); - return true; - } finally { - grpcForwardDurationTimer.observeDuration(); - } - } - return false; - } - - protected Kvrpcpb.Context makeContext(TiStoreType storeType) { - return region.getReplicaContext(java.util.Collections.emptySet(), storeType); + return null; } - protected Kvrpcpb.Context makeContext(Set resolvedLocks, TiStoreType storeType) { - return region.getReplicaContext(resolvedLocks, storeType); - } - - private void updateClientStub() { - String addressStr = store.getStore().getAddress(); - long deadline = timeout; - if (store.getProxyStore() != null) { - addressStr = store.getProxyStore().getAddress(); - deadline = conf.getForwardTimeout(); - } - ManagedChannel channel = - channelFactory.getChannel(addressStr, regionManager.getPDClient().getHostMapping()); - blockingStub = - TikvGrpc.newBlockingStub(channel).withDeadlineAfter(deadline, TimeUnit.MILLISECONDS); - asyncStub = TikvGrpc.newFutureStub(channel).withDeadlineAfter(deadline, TimeUnit.MILLISECONDS); - if (store.getProxyStore() != null) { - Metadata header = new Metadata(); - header.put(TiConfiguration.FORWARD_META_DATA_KEY, store.getStore().getAddress()); - blockingStub = MetadataUtils.attachHeaders(blockingStub, header); - asyncStub = MetadataUtils.attachHeaders(asyncStub, header); + private boolean tryGrpcForward() { + Histogram.Timer grpcForwardDurationTimer = GRPC_FORWARD_DURATION.startTimer(); + try { + logger.info(String.format("try grpc forward: region[%d]", region.getId())); + // when current leader cannot be reached + TiStore storeWithProxy = switchProxyStore(); + if (storeWithProxy == null) { + // no store available, retry + logger.warn(String.format("No store available, retry: region[%d]", region.getId())); + return false; + } + // use proxy store to forward requests + regionManager.updateStore(store, storeWithProxy); + store = storeWithProxy; + updateClientStub(); + return true; + } finally { + grpcForwardDurationTimer.observeDuration(); } } From 56aaf0bc16cda1dba8dcf3e896c811c3d6e941ea Mon Sep 17 00:00:00 2001 From: birdstorm Date: Thu, 18 Nov 2021 16:39:19 +0800 Subject: [PATCH 18/21] resolve comment Signed-off-by: birdstorm --- .../region/AbstractRegionStoreClient.java | 52 +++++++++---------- 1 file changed, 25 insertions(+), 27 deletions(-) diff --git a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java index 3cb9df6a951..ef5f5162d3e 100644 --- a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java @@ -288,23 +288,22 @@ private Pair switchLeader() { } List unfinished = new LinkedList<>(); for (SwitchLeaderTask task : responses) { - if (task.task.isDone()) { - try { - Kvrpcpb.RawGetResponse resp = task.task.get(); - if (resp != null) { - if (!resp.hasRegionError()) { - // the peer is leader - logger.info( - String.format( - "rawGet response indicates peer[%d] is leader", task.peer.getId())); - return Pair.create(task.peer, exceptionEncountered); - } + if (!task.task.isDone()) { + unfinished.add(task); + } + try { + Kvrpcpb.RawGetResponse resp = task.task.get(); + if (resp != null) { + if (!resp.hasRegionError()) { + // the peer is leader + logger.info( + String.format( + "rawGet response indicates peer[%d] is leader", task.peer.getId())); + return Pair.create(task.peer, exceptionEncountered); } - } catch (Exception ignored) { - exceptionEncountered = true; } - } else { - unfinished.add(task); + } catch (Exception ignored) { + exceptionEncountered = true; } } if (unfinished.isEmpty()) { @@ -344,20 +343,19 @@ private TiStore switchProxyStore() { } List unfinished = new LinkedList<>(); for (ForwardCheckTask task : responses) { - if (task.task.isDone()) { - try { - // any answer will do - Kvrpcpb.RawGetResponse resp = task.task.get(); - logger.info( - String.format( - "rawGetResponse indicates forward from [%s] to [%s]", - task.store.getAddress(), store.getAddress())); - return store.withProxy(task.store); - } catch (Exception ignored) { - } - } else { + if (!task.task.isDone()) { unfinished.add(task); } + try { + // any answer will do + Kvrpcpb.RawGetResponse resp = task.task.get(); + logger.info( + String.format( + "rawGetResponse indicates forward from [%s] to [%s]", + task.store.getAddress(), store.getAddress())); + return store.withProxy(task.store); + } catch (Exception ignored) { + } } if (unfinished.isEmpty()) { return null; From 06d1db2ae724fe6aff6c4ebbd78d38fee6f083b0 Mon Sep 17 00:00:00 2001 From: birdstorm Date: Thu, 18 Nov 2021 16:56:53 +0800 Subject: [PATCH 19/21] rename metrics and method name Signed-off-by: birdstorm --- .../region/AbstractRegionStoreClient.java | 30 ++++++++++--------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java index ef5f5162d3e..75831d893ae 100644 --- a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java @@ -46,16 +46,16 @@ public abstract class AbstractRegionStoreClient implements RegionErrorReceiver { private static final Logger logger = LoggerFactory.getLogger(AbstractRegionStoreClient.class); - public static final Histogram SWITCH_LEADER_DURATION = + public static final Histogram SEEK_LEADER_STORE_DURATION = Histogram.build() - .name("client_java_switch_leader_duration") - .help("switch leader duration.") + .name("client_java_seek_leader_store_duration") + .help("seek leader store duration.") .register(); - public static final Histogram GRPC_FORWARD_DURATION = + public static final Histogram SEEK_PROXY_STORE_DURATION = Histogram.build() - .name("client_java_grpc_forward_duration") - .help("grpc forward duration.") + .name("client_java_seek_proxy_store_duration") + .help("seek proxy store duration.") .register(); protected final RegionManager regionManager; @@ -147,12 +147,14 @@ public boolean onStoreUnreachable() { } } - Boolean result = trySwitchLeader(); + // seek an available leader store to send request + Boolean result = seekLeaderStore(); if (result != null) { return result; } if (conf.getEnableGrpcForward()) { - return tryGrpcForward(); + // seek an available proxy store to forward request + return seekProxyStore(); } return false; } @@ -185,8 +187,8 @@ private void updateClientStub() { } } - private Boolean trySwitchLeader() { - Histogram.Timer switchLeaderDurationTimer = SWITCH_LEADER_DURATION.startTimer(); + private Boolean seekLeaderStore() { + Histogram.Timer switchLeaderDurationTimer = SEEK_LEADER_STORE_DURATION.startTimer(); try { List peers = region.getFollowerList(); if (peers.isEmpty()) { @@ -198,7 +200,7 @@ private Boolean trySwitchLeader() { logger.info(String.format("try switch leader: region[%d]", region.getId())); - Pair pair = switchLeader(); + Pair pair = switchLeaderStore(); Metapb.Peer peer = pair.first; boolean exceptionEncountered = pair.second; if (peer == null) { @@ -239,8 +241,8 @@ private Boolean trySwitchLeader() { return null; } - private boolean tryGrpcForward() { - Histogram.Timer grpcForwardDurationTimer = GRPC_FORWARD_DURATION.startTimer(); + private boolean seekProxyStore() { + Histogram.Timer grpcForwardDurationTimer = SEEK_PROXY_STORE_DURATION.startTimer(); try { logger.info(String.format("try grpc forward: region[%d]", region.getId())); // when current leader cannot be reached @@ -261,7 +263,7 @@ private boolean tryGrpcForward() { } // first: leader peer, second: true if any responses returned with grpc error - private Pair switchLeader() { + private Pair switchLeaderStore() { List responses = new LinkedList<>(); for (Metapb.Peer peer : region.getFollowerList()) { ByteString key = region.getStartKey(); From 0116e63b11aa4575f4465cbca758069d0b91776c Mon Sep 17 00:00:00 2001 From: birdstorm Date: Thu, 18 Nov 2021 17:10:21 +0800 Subject: [PATCH 20/21] add continue Signed-off-by: birdstorm --- .../org/tikv/common/region/AbstractRegionStoreClient.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java index 75831d893ae..f72146c5957 100644 --- a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java @@ -138,7 +138,7 @@ public boolean onStoreUnreachable() { return true; } - if (store.getProxyStore() == null) { + if (store.getProxyStore() == null && store.isReachable()) { if (store.isReachable()) { logger.info( String.format( @@ -292,6 +292,7 @@ private Pair switchLeaderStore() { for (SwitchLeaderTask task : responses) { if (!task.task.isDone()) { unfinished.add(task); + continue; } try { Kvrpcpb.RawGetResponse resp = task.task.get(); @@ -347,6 +348,7 @@ private TiStore switchProxyStore() { for (ForwardCheckTask task : responses) { if (!task.task.isDone()) { unfinished.add(task); + continue; } try { // any answer will do From bd20559f41008a450886ce037d73abf5e139803c Mon Sep 17 00:00:00 2001 From: birdstorm Date: Thu, 18 Nov 2021 18:01:32 +0800 Subject: [PATCH 21/21] format Signed-off-by: birdstorm --- .../java/org/tikv/common/region/AbstractRegionStoreClient.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java index f72146c5957..96e620279aa 100644 --- a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java @@ -300,8 +300,7 @@ private Pair switchLeaderStore() { if (!resp.hasRegionError()) { // the peer is leader logger.info( - String.format( - "rawGet response indicates peer[%d] is leader", task.peer.getId())); + String.format("rawGet response indicates peer[%d] is leader", task.peer.getId())); return Pair.create(task.peer, exceptionEncountered); } }