From 3d28c60b69dd21f1eb3212c1d722f4affc8ef761 Mon Sep 17 00:00:00 2001 From: iosmanthus Date: Wed, 27 Jul 2022 16:26:14 +0800 Subject: [PATCH 1/8] constraint the getMember timeout by inject a backoffer Signed-off-by: iosmanthus --- src/main/java/org/tikv/common/PDClient.java | 17 +++++++++++------ .../tikv/common/operation/PDErrorHandler.java | 4 ++-- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/src/main/java/org/tikv/common/PDClient.java b/src/main/java/org/tikv/common/PDClient.java index cfafbff4550..e2530534807 100644 --- a/src/main/java/org/tikv/common/PDClient.java +++ b/src/main/java/org/tikv/common/PDClient.java @@ -440,6 +440,8 @@ PDClientWrapper getPdClientWrapper() { private GetMembersResponse doGetMembers(BackOffer backOffer, URI uri) { while (true) { + backOffer.checkTimeout(); + try { ManagedChannel probChan = channelFactory.getChannel(uriToAddr(uri), hostMapping); PDGrpc.PDBlockingStub stub = @@ -459,8 +461,7 @@ private GetMembersResponse doGetMembers(BackOffer backOffer, URI uri) { } } - private GetMembersResponse getMembers(URI uri) { - BackOffer backOffer = ConcreteBackOffer.newCustomBackOff(BackOffer.PD_INFO_BACKOFF); + private GetMembersResponse getMembers(BackOffer backOffer, URI uri) { try { return doGetMembers(backOffer, uri); } catch (Exception e) { @@ -516,13 +517,13 @@ synchronized boolean createFollowerClientWrapper(String followerUrlStr, String l return true; } - public synchronized void updateLeaderOrForwardFollower() { + public synchronized void updateLeaderOrForwardFollower(BackOffer backOffer) { if (System.currentTimeMillis() - lastUpdateLeaderTime < MIN_TRY_UPDATE_DURATION) { return; } for (URI url : this.pdAddrs) { // since resp is null, we need update leader's address by walking through all pd server. - GetMembersResponse resp = getMembers(url); + GetMembersResponse resp = getMembers(backOffer, url); if (resp == null) { continue; } @@ -578,7 +579,7 @@ public synchronized void updateLeaderOrForwardFollower() { public void tryUpdateLeader() { for (URI url : this.pdAddrs) { // since resp is null, we need update leader's address by walking through all pd server. - GetMembersResponse resp = getMembers(url); + GetMembersResponse resp = getMembers(defaultBackOffer(), url); if (resp == null) { continue; } @@ -705,7 +706,7 @@ private void initCluster() { this.timeout = conf.getPdFirstGetMemberTimeout(); for (URI u : pdAddrs) { logger.info("get members with pd " + u + ": start"); - resp = getMembers(u); + resp = getMembers(defaultBackOffer(), u); logger.info("get members with pd " + u + ": end"); if (resp != null) { break; @@ -825,4 +826,8 @@ public List getPdAddrs() { public RequestKeyCodec getCodec() { return codec; } + + private static BackOffer defaultBackOffer() { + return ConcreteBackOffer.newCustomBackOff(BackOffer.PD_INFO_BACKOFF); + } } diff --git a/src/main/java/org/tikv/common/operation/PDErrorHandler.java b/src/main/java/org/tikv/common/operation/PDErrorHandler.java index df403789503..4f9cc7fbabb 100644 --- a/src/main/java/org/tikv/common/operation/PDErrorHandler.java +++ b/src/main/java/org/tikv/common/operation/PDErrorHandler.java @@ -59,7 +59,7 @@ public boolean handleResponseError(BackOffer backOffer, RespT resp) { case PD_ERROR: backOffer.doBackOff( BackOffFunction.BackOffFuncType.BoPDRPC, new GrpcException(error.toString())); - client.updateLeaderOrForwardFollower(); + client.updateLeaderOrForwardFollower(backOffer); return true; case REGION_PEER_NOT_ELECTED: logger.debug(error.getMessage()); @@ -80,7 +80,7 @@ public boolean handleRequestError(BackOffer backOffer, Exception e) { return false; } backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoPDRPC, e); - client.updateLeaderOrForwardFollower(); + client.updateLeaderOrForwardFollower(backOffer); return true; } } From 743415b4e85b27b6cbc4a19fd4e5813649e746f8 Mon Sep 17 00:00:00 2001 From: iosmanthus Date: Sat, 30 Jul 2022 00:57:13 +0800 Subject: [PATCH 2/8] wip: use try lock Signed-off-by: iosmanthus --- .../org/tikv/common/AbstractGRPCClient.java | 10 +++-- src/main/java/org/tikv/common/PDClient.java | 37 +++++++++++++++---- .../tikv/common/operation/PDErrorHandler.java | 15 +++++++- .../tikv/common/region/RegionStoreClient.java | 20 ++++------ 4 files changed, 58 insertions(+), 24 deletions(-) diff --git a/src/main/java/org/tikv/common/AbstractGRPCClient.java b/src/main/java/org/tikv/common/AbstractGRPCClient.java index 616bcfbaebe..342b215f668 100644 --- a/src/main/java/org/tikv/common/AbstractGRPCClient.java +++ b/src/main/java/org/tikv/common/AbstractGRPCClient.java @@ -33,6 +33,7 @@ import java.util.function.Supplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.tikv.common.log.SlowLogSpan; import org.tikv.common.operation.ErrorHandler; import org.tikv.common.policy.RetryMaxMs.Builder; import org.tikv.common.policy.RetryPolicy; @@ -40,7 +41,6 @@ import org.tikv.common.util.BackOffFunction.BackOffFuncType; import org.tikv.common.util.BackOffer; import org.tikv.common.util.ChannelFactory; -import org.tikv.common.util.ConcreteBackOffer; public abstract class AbstractGRPCClient< BlockingStubT extends AbstractStub, @@ -183,6 +183,8 @@ public long getTimeout() { private boolean doCheckHealth(BackOffer backOffer, String addressStr, HostMapping hostMapping) { while (true) { + backOffer.checkTimeout(); + try { ManagedChannel channel = channelFactory.getChannel(addressStr, hostMapping); HealthGrpc.HealthBlockingStub stub = @@ -198,12 +200,14 @@ private boolean doCheckHealth(BackOffer backOffer, String addressStr, HostMappin } } - protected boolean checkHealth(String addressStr, HostMapping hostMapping) { - BackOffer backOffer = ConcreteBackOffer.newCustomBackOff((int) (timeout * 2)); + protected boolean checkHealth(BackOffer backOffer, String addressStr, HostMapping hostMapping) { + SlowLogSpan checkHealthSpan = backOffer.getSlowLog().start("check_health"); try { return doCheckHealth(backOffer, addressStr, hostMapping); } catch (Exception e) { return false; + } finally { + checkHealthSpan.end(); } } } diff --git a/src/main/java/org/tikv/common/PDClient.java b/src/main/java/org/tikv/common/PDClient.java index e2530534807..73f9648afa8 100644 --- a/src/main/java/org/tikv/common/PDClient.java +++ b/src/main/java/org/tikv/common/PDClient.java @@ -53,6 +53,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Supplier; import java.util.stream.Collectors; import org.apache.http.client.methods.CloseableHttpResponse; @@ -66,6 +67,7 @@ import org.tikv.common.codec.KeyUtils; import org.tikv.common.exception.GrpcException; import org.tikv.common.exception.TiClientInternalException; +import org.tikv.common.log.SlowLogSpan; import org.tikv.common.meta.TiTimestamp; import org.tikv.common.operation.NoopHandler; import org.tikv.common.operation.PDErrorHandler; @@ -123,6 +125,8 @@ public class PDClient extends AbstractGRPCClient private HostMapping hostMapping; private long lastUpdateLeaderTime; + private final ReentrantLock leaderLock = new ReentrantLock(); + public static final Histogram PD_GET_REGION_BY_KEY_REQUEST_LATENCY = HistogramUtils.buildDuration() .name("client_java_pd_get_region_by_requests_latency") @@ -462,10 +466,13 @@ private GetMembersResponse doGetMembers(BackOffer backOffer, URI uri) { } private GetMembersResponse getMembers(BackOffer backOffer, URI uri) { + SlowLogSpan getMembersSpan = backOffer.getSlowLog().start("get_members"); try { return doGetMembers(backOffer, uri); } catch (Exception e) { return null; + } finally { + getMembersSpan.end(); } } @@ -498,11 +505,12 @@ private synchronized boolean createLeaderClientWrapper(String leaderUrlStr) { return true; } - synchronized boolean createFollowerClientWrapper(String followerUrlStr, String leaderUrls) { + synchronized boolean createFollowerClientWrapper( + BackOffer backOffer, String followerUrlStr, String leaderUrls) { // TODO: Why not strip protocol info on server side since grpc does not need it try { - if (!checkHealth(followerUrlStr, hostMapping)) { + if (!checkHealth(backOffer, followerUrlStr, hostMapping)) { return false; } @@ -517,7 +525,19 @@ synchronized boolean createFollowerClientWrapper(String followerUrlStr, String l return true; } - public synchronized void updateLeaderOrForwardFollower(BackOffer backOffer) { + public void tryUpdateLeaderOrForwardFollower(BackOffer backOffer) { + if (leaderLock.tryLock()) { + SlowLogSpan updateLeaderSpan = backOffer.getSlowLog().start("try_update_leader"); + try { + updateLeaderOrForwardFollower(backOffer); + } finally { + leaderLock.unlock(); + updateLeaderSpan.end(); + } + } + } + + private synchronized void updateLeaderOrForwardFollower(BackOffer backOffer) { if (System.currentTimeMillis() - lastUpdateLeaderTime < MIN_TRY_UPDATE_DURATION) { return; } @@ -535,7 +555,8 @@ public synchronized void updateLeaderOrForwardFollower(BackOffer backOffer) { leaderUrlStr = uriToAddr(addrToUri(leaderUrlStr)); // if leader is switched, just return. - if (checkHealth(leaderUrlStr, hostMapping) && createLeaderClientWrapper(leaderUrlStr)) { + if (checkHealth(backOffer, leaderUrlStr, hostMapping) + && createLeaderClientWrapper(leaderUrlStr)) { lastUpdateLeaderTime = System.currentTimeMillis(); return; } @@ -562,7 +583,8 @@ public synchronized void updateLeaderOrForwardFollower(BackOffer backOffer) { hasReachNextMember = true; continue; } - if (hasReachNextMember && createFollowerClientWrapper(followerUrlStr, leaderUrlStr)) { + if (hasReachNextMember + && createFollowerClientWrapper(backOffer, followerUrlStr, leaderUrlStr)) { logger.warn( String.format("forward request to pd [%s] by pd [%s]", leaderUrlStr, followerUrlStr)); return; @@ -578,8 +600,9 @@ public synchronized void updateLeaderOrForwardFollower(BackOffer backOffer) { public void tryUpdateLeader() { for (URI url : this.pdAddrs) { + BackOffer backOffer = defaultBackOffer(); // since resp is null, we need update leader's address by walking through all pd server. - GetMembersResponse resp = getMembers(defaultBackOffer(), url); + GetMembersResponse resp = getMembers(backOffer, url); if (resp == null) { continue; } @@ -592,7 +615,7 @@ public void tryUpdateLeader() { leaderUrlStr = uriToAddr(addrToUri(leaderUrlStr)); // If leader is not change but becomes available, we can cancel follower forward. - if (checkHealth(leaderUrlStr, hostMapping) && trySwitchLeader(leaderUrlStr)) { + if (checkHealth(backOffer, leaderUrlStr, hostMapping) && trySwitchLeader(leaderUrlStr)) { if (!urls.equals(this.pdAddrs)) { tryUpdateMembers(urls); } diff --git a/src/main/java/org/tikv/common/operation/PDErrorHandler.java b/src/main/java/org/tikv/common/operation/PDErrorHandler.java index 4f9cc7fbabb..f63e0eca001 100644 --- a/src/main/java/org/tikv/common/operation/PDErrorHandler.java +++ b/src/main/java/org/tikv/common/operation/PDErrorHandler.java @@ -26,6 +26,7 @@ import org.tikv.common.PDClient; import org.tikv.common.exception.GrpcException; import org.tikv.common.exception.TiClientInternalException; +import org.tikv.common.log.SlowLogSpan; import org.tikv.common.pd.PDError; import org.tikv.common.util.BackOffFunction; import org.tikv.common.util.BackOffer; @@ -59,7 +60,12 @@ public boolean handleResponseError(BackOffer backOffer, RespT resp) { case PD_ERROR: backOffer.doBackOff( BackOffFunction.BackOffFuncType.BoPDRPC, new GrpcException(error.toString())); - client.updateLeaderOrForwardFollower(backOffer); + SlowLogSpan tryUpdateLeaderSpan = backOffer.getSlowLog().start("try_update_leader"); + try { + client.tryUpdateLeaderOrForwardFollower(backOffer); + } finally { + tryUpdateLeaderSpan.end(); + } return true; case REGION_PEER_NOT_ELECTED: logger.debug(error.getMessage()); @@ -80,7 +86,12 @@ public boolean handleRequestError(BackOffer backOffer, Exception e) { return false; } backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoPDRPC, e); - client.updateLeaderOrForwardFollower(backOffer); + SlowLogSpan updateLeaderSpan = backOffer.getSlowLog().start("update_leader"); + try { + client.tryUpdateLeaderOrForwardFollower(backOffer); + } finally { + updateLeaderSpan.end(); + } return true; } } diff --git a/src/main/java/org/tikv/common/region/RegionStoreClient.java b/src/main/java/org/tikv/common/region/RegionStoreClient.java index ccc820891f1..ba742c872b0 100644 --- a/src/main/java/org/tikv/common/region/RegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/RegionStoreClient.java @@ -1408,38 +1408,34 @@ public RegionStoreClient build(TiRegion region, TiStore store, TiStoreType store this); } - public synchronized RegionStoreClient build(TiRegion region, TiStore store) - throws GrpcException { + public RegionStoreClient build(TiRegion region, TiStore store) throws GrpcException { return build(region, store, TiStoreType.TiKV); } - public synchronized RegionStoreClient build(ByteString key) throws GrpcException { + public RegionStoreClient build(ByteString key) throws GrpcException { return build(key, TiStoreType.TiKV); } - public synchronized RegionStoreClient build(ByteString key, BackOffer backOffer) - throws GrpcException { + public RegionStoreClient build(ByteString key, BackOffer backOffer) throws GrpcException { return build(key, TiStoreType.TiKV, backOffer); } - public synchronized RegionStoreClient build(ByteString key, TiStoreType storeType) - throws GrpcException { + public RegionStoreClient build(ByteString key, TiStoreType storeType) throws GrpcException { return build(key, storeType, defaultBackOff()); } - public synchronized RegionStoreClient build( - ByteString key, TiStoreType storeType, BackOffer backOffer) throws GrpcException { + public RegionStoreClient build(ByteString key, TiStoreType storeType, BackOffer backOffer) + throws GrpcException { Pair pair = regionManager.getRegionStorePairByKey(key, storeType, backOffer); return build(pair.first, pair.second, storeType); } - public synchronized RegionStoreClient build(TiRegion region) throws GrpcException { + public RegionStoreClient build(TiRegion region) throws GrpcException { return build(region, defaultBackOff()); } - public synchronized RegionStoreClient build(TiRegion region, BackOffer backOffer) - throws GrpcException { + public RegionStoreClient build(TiRegion region, BackOffer backOffer) throws GrpcException { TiStore store = regionManager.getStoreById(region.getLeader().getStoreId(), backOffer); return build(region, store, TiStoreType.TiKV); } From 55df88836e0232361d213f2fb15265b5c85f3af0 Mon Sep 17 00:00:00 2001 From: iosmanthus Date: Sat, 30 Jul 2022 03:22:29 +0800 Subject: [PATCH 3/8] wip: async update leader Signed-off-by: iosmanthus --- .../org/tikv/common/AbstractGRPCClient.java | 10 +---- src/main/java/org/tikv/common/PDClient.java | 44 +++++++++---------- .../tikv/common/operation/PDErrorHandler.java | 6 +-- 3 files changed, 26 insertions(+), 34 deletions(-) diff --git a/src/main/java/org/tikv/common/AbstractGRPCClient.java b/src/main/java/org/tikv/common/AbstractGRPCClient.java index 342b215f668..ac68552f7e2 100644 --- a/src/main/java/org/tikv/common/AbstractGRPCClient.java +++ b/src/main/java/org/tikv/common/AbstractGRPCClient.java @@ -33,7 +33,6 @@ import java.util.function.Supplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.tikv.common.log.SlowLogSpan; import org.tikv.common.operation.ErrorHandler; import org.tikv.common.policy.RetryMaxMs.Builder; import org.tikv.common.policy.RetryPolicy; @@ -201,13 +200,6 @@ private boolean doCheckHealth(BackOffer backOffer, String addressStr, HostMappin } protected boolean checkHealth(BackOffer backOffer, String addressStr, HostMapping hostMapping) { - SlowLogSpan checkHealthSpan = backOffer.getSlowLog().start("check_health"); - try { - return doCheckHealth(backOffer, addressStr, hostMapping); - } catch (Exception e) { - return false; - } finally { - checkHealthSpan.end(); - } + return doCheckHealth(backOffer, addressStr, hostMapping); } } diff --git a/src/main/java/org/tikv/common/PDClient.java b/src/main/java/org/tikv/common/PDClient.java index 73f9648afa8..4171a2611c4 100644 --- a/src/main/java/org/tikv/common/PDClient.java +++ b/src/main/java/org/tikv/common/PDClient.java @@ -50,10 +50,11 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; import java.util.stream.Collectors; import org.apache.http.client.methods.CloseableHttpResponse; @@ -67,7 +68,6 @@ import org.tikv.common.codec.KeyUtils; import org.tikv.common.exception.GrpcException; import org.tikv.common.exception.TiClientInternalException; -import org.tikv.common.log.SlowLogSpan; import org.tikv.common.meta.TiTimestamp; import org.tikv.common.operation.NoopHandler; import org.tikv.common.operation.PDErrorHandler; @@ -124,8 +124,8 @@ public class PDClient extends AbstractGRPCClient private ConcurrentMap tiflashReplicaMap; private HostMapping hostMapping; private long lastUpdateLeaderTime; - - private final ReentrantLock leaderLock = new ReentrantLock(); + private final ExecutorService updateLeaderService = Executors.newSingleThreadExecutor(); + private final AtomicBoolean updateLeaderNotify = new AtomicBoolean(); public static final Histogram PD_GET_REGION_BY_KEY_REQUEST_LATENCY = HistogramUtils.buildDuration() @@ -430,6 +430,8 @@ public void close() throws InterruptedException { if (channelFactory != null) { channelFactory.close(); } + + updateLeaderService.shutdownNow(); } @VisibleForTesting @@ -466,14 +468,7 @@ private GetMembersResponse doGetMembers(BackOffer backOffer, URI uri) { } private GetMembersResponse getMembers(BackOffer backOffer, URI uri) { - SlowLogSpan getMembersSpan = backOffer.getSlowLog().start("get_members"); - try { - return doGetMembers(backOffer, uri); - } catch (Exception e) { - return null; - } finally { - getMembersSpan.end(); - } + return doGetMembers(backOffer, uri); } // return whether the leader has changed to target address `leaderUrlStr`. @@ -525,15 +520,20 @@ synchronized boolean createFollowerClientWrapper( return true; } - public void tryUpdateLeaderOrForwardFollower(BackOffer backOffer) { - if (leaderLock.tryLock()) { - SlowLogSpan updateLeaderSpan = backOffer.getSlowLog().start("try_update_leader"); - try { - updateLeaderOrForwardFollower(backOffer); - } finally { - leaderLock.unlock(); - updateLeaderSpan.end(); - } + public void tryUpdateLeaderOrForwardFollower() { + logger.error("here!!!!!"); + if (updateLeaderNotify.compareAndSet(false, true)) { + logger.error("there!!!"); + BackOffer backOffer = ConcreteBackOffer.newCustomBackOff(BackOffer.PD_INFO_BACKOFF); + updateLeaderService.submit( + () -> { + logger.error("try_update!!!"); + try { + updateLeaderOrForwardFollower(backOffer); + } finally { + updateLeaderNotify.set(false); + } + }); } } @@ -687,7 +687,7 @@ public double getTiFlashReplicaProgress(long tableId) { } @Override - protected PDBlockingStub getBlockingStub() { + protected synchronized PDBlockingStub getBlockingStub() { if (pdClientWrapper == null) { throw new GrpcException("PDClient may not be initialized"); } diff --git a/src/main/java/org/tikv/common/operation/PDErrorHandler.java b/src/main/java/org/tikv/common/operation/PDErrorHandler.java index f63e0eca001..76c6a768644 100644 --- a/src/main/java/org/tikv/common/operation/PDErrorHandler.java +++ b/src/main/java/org/tikv/common/operation/PDErrorHandler.java @@ -62,7 +62,7 @@ public boolean handleResponseError(BackOffer backOffer, RespT resp) { BackOffFunction.BackOffFuncType.BoPDRPC, new GrpcException(error.toString())); SlowLogSpan tryUpdateLeaderSpan = backOffer.getSlowLog().start("try_update_leader"); try { - client.tryUpdateLeaderOrForwardFollower(backOffer); + client.tryUpdateLeaderOrForwardFollower(); } finally { tryUpdateLeaderSpan.end(); } @@ -86,9 +86,9 @@ public boolean handleRequestError(BackOffer backOffer, Exception e) { return false; } backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoPDRPC, e); - SlowLogSpan updateLeaderSpan = backOffer.getSlowLog().start("update_leader"); + SlowLogSpan updateLeaderSpan = backOffer.getSlowLog().start("try_update_leader"); try { - client.tryUpdateLeaderOrForwardFollower(backOffer); + client.tryUpdateLeaderOrForwardFollower(); } finally { updateLeaderSpan.end(); } From d2bf550a6e06ad948b61fb7eb3201c05153d6409 Mon Sep 17 00:00:00 2001 From: iosmanthus Date: Sat, 30 Jul 2022 03:55:04 +0800 Subject: [PATCH 4/8] remove sync keyword for get stub Signed-off-by: iosmanthus --- src/main/java/org/tikv/common/PDClient.java | 27 ++++++++++++--------- 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/src/main/java/org/tikv/common/PDClient.java b/src/main/java/org/tikv/common/PDClient.java index 4171a2611c4..6180e1afb86 100644 --- a/src/main/java/org/tikv/common/PDClient.java +++ b/src/main/java/org/tikv/common/PDClient.java @@ -52,6 +52,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -521,19 +522,21 @@ synchronized boolean createFollowerClientWrapper( } public void tryUpdateLeaderOrForwardFollower() { - logger.error("here!!!!!"); if (updateLeaderNotify.compareAndSet(false, true)) { - logger.error("there!!!"); BackOffer backOffer = ConcreteBackOffer.newCustomBackOff(BackOffer.PD_INFO_BACKOFF); - updateLeaderService.submit( - () -> { - logger.error("try_update!!!"); - try { - updateLeaderOrForwardFollower(backOffer); - } finally { - updateLeaderNotify.set(false); - } - }); + try { + updateLeaderService.submit( + () -> { + try { + updateLeaderOrForwardFollower(backOffer); + } finally { + updateLeaderNotify.set(false); + } + }); + } catch (RejectedExecutionException e) { + logger.error("PDClient is shutdown", e); + updateLeaderNotify.set(false); + } } } @@ -687,7 +690,7 @@ public double getTiFlashReplicaProgress(long tableId) { } @Override - protected synchronized PDBlockingStub getBlockingStub() { + protected PDBlockingStub getBlockingStub() { if (pdClientWrapper == null) { throw new GrpcException("PDClient may not be initialized"); } From 71b5772597158b5c5a4c67f7ae06f7813978c527 Mon Sep 17 00:00:00 2001 From: iosmanthus Date: Sat, 30 Jul 2022 10:31:01 +0800 Subject: [PATCH 5/8] remove try with in test Signed-off-by: iosmanthus --- .../org/tikv/common/PDClientV2MockTest.java | 33 +++++++++---------- 1 file changed, 15 insertions(+), 18 deletions(-) diff --git a/src/test/java/org/tikv/common/PDClientV2MockTest.java b/src/test/java/org/tikv/common/PDClientV2MockTest.java index 305cd101c38..af9884b415b 100644 --- a/src/test/java/org/tikv/common/PDClientV2MockTest.java +++ b/src/test/java/org/tikv/common/PDClientV2MockTest.java @@ -88,18 +88,16 @@ public void testGetRegionById() throws Exception { String start = "getRegionById"; String end = "getRegionByIdEnd"; leader.addGetRegionByIDListener(request -> makeGetRegionResponse(start, end)); - try (PDClient client = createClient()) { - Metapb.Region r = client.getRegionByID(ConcreteBackOffer.newRawKVBackOff(), 1).first; - Assert.assertEquals(start, r.getStartKey().toStringUtf8()); - Assert.assertEquals(end, r.getEndKey().toStringUtf8()); - } + PDClient client = createClient(); + Metapb.Region r = client.getRegionByID(ConcreteBackOffer.newRawKVBackOff(), 1).first; + Assert.assertEquals(start, r.getStartKey().toStringUtf8()); + Assert.assertEquals(end, r.getEndKey().toStringUtf8()); leader.addGetRegionByIDListener(request -> makeGetRegionResponse(start, "")); - try (PDClient client = createClient()) { - Metapb.Region r = client.getRegionByID(ConcreteBackOffer.newRawKVBackOff(), 1).first; - Assert.assertEquals(start, r.getStartKey().toStringUtf8()); - Assert.assertEquals("", r.getEndKey().toStringUtf8()); - } + + r = client.getRegionByID(ConcreteBackOffer.newRawKVBackOff(), 1).first; + Assert.assertEquals(start, r.getStartKey().toStringUtf8()); + Assert.assertEquals("", r.getEndKey().toStringUtf8()); } @Test @@ -113,15 +111,14 @@ public void testScanRegions() throws Exception { .addRegions(Pdpb.Region.newBuilder().setRegion(makeRegion(start, end)).build()) .build()); - try (PDClient client = createClient()) { - List regions = - client.scanRegions( - ConcreteBackOffer.newRawKVBackOff(), ByteString.EMPTY, ByteString.EMPTY, 1); + PDClient client = createClient(); + List regions = + client.scanRegions( + ConcreteBackOffer.newRawKVBackOff(), ByteString.EMPTY, ByteString.EMPTY, 1); - for (Region r : regions) { - Assert.assertEquals(start, r.getRegion().getStartKey().toStringUtf8()); - Assert.assertEquals(end, r.getRegion().getEndKey().toStringUtf8()); - } + for (Region r : regions) { + Assert.assertEquals(start, r.getRegion().getStartKey().toStringUtf8()); + Assert.assertEquals(end, r.getRegion().getEndKey().toStringUtf8()); } } } From 360a3c7d24aeec3caa1630d38bb90d3e3b412dc4 Mon Sep 17 00:00:00 2001 From: iosmanthus Date: Sat, 30 Jul 2022 19:22:59 +0800 Subject: [PATCH 6/8] fix potential NPE in region manager Signed-off-by: iosmanthus --- src/main/java/org/tikv/common/region/RegionManager.java | 2 -- src/test/java/org/tikv/common/TimeoutTest.java | 7 +++++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/main/java/org/tikv/common/region/RegionManager.java b/src/main/java/org/tikv/common/region/RegionManager.java index 8c1624e42e0..7f21ca8d20e 100644 --- a/src/main/java/org/tikv/common/region/RegionManager.java +++ b/src/main/java/org/tikv/common/region/RegionManager.java @@ -138,8 +138,6 @@ public TiRegion getRegionByKey(ByteString key, BackOffer backOffer) { region = cache.putRegion(createRegion(regionAndLeader.first, regionAndLeader.second, backOffer)); } - } catch (Exception e) { - return null; } finally { requestTimer.observeDuration(); slowLogSpan.end(); diff --git a/src/test/java/org/tikv/common/TimeoutTest.java b/src/test/java/org/tikv/common/TimeoutTest.java index e0f3d01d1ce..33642c2ccba 100644 --- a/src/test/java/org/tikv/common/TimeoutTest.java +++ b/src/test/java/org/tikv/common/TimeoutTest.java @@ -52,9 +52,12 @@ public void testTimeoutInTime() { try (RawKVClient client = createClient()) { pdServers.get(0).stop(); long start = System.currentTimeMillis(); - client.get(ByteString.copyFromUtf8("key")); + try { + client.get(ByteString.copyFromUtf8("key")); + } catch (Exception ignore) { + } long end = System.currentTimeMillis(); - Assert.assertTrue(end - start < session.getConf().getRawKVReadTimeoutInMS() * 2L); + Assert.assertTrue(end - start < (session.getConf().getRawKVReadTimeoutInMS() * 1.5)); } } } From 4d96e5f1cd168912db0bf5610876507e92881f20 Mon Sep 17 00:00:00 2001 From: iosmanthus Date: Sat, 30 Jul 2022 23:08:52 +0800 Subject: [PATCH 7/8] fix id mismatch Signed-off-by: iosmanthus --- src/main/java/org/tikv/common/region/RegionManager.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/tikv/common/region/RegionManager.java b/src/main/java/org/tikv/common/region/RegionManager.java index 7f21ca8d20e..44c81375107 100644 --- a/src/main/java/org/tikv/common/region/RegionManager.java +++ b/src/main/java/org/tikv/common/region/RegionManager.java @@ -138,6 +138,8 @@ public TiRegion getRegionByKey(ByteString key, BackOffer backOffer) { region = cache.putRegion(createRegion(regionAndLeader.first, regionAndLeader.second, backOffer)); } + } catch (Exception e) { + return null; } finally { requestTimer.observeDuration(); slowLogSpan.end(); @@ -180,7 +182,7 @@ public Pair getRegionStorePairByKey(ByteString key, TiStoreTy public Pair getRegionStorePairByKey( ByteString key, TiStoreType storeType, BackOffer backOffer) { TiRegion region = getRegionByKey(key, backOffer); - if (!region.isValid()) { + if (region == null || !region.isValid()) { throw new TiClientInternalException("Region invalid: " + region); } From 54ecb43f63871ad55690bebf77d4f795a22ee8b6 Mon Sep 17 00:00:00 2001 From: iosmanthus Date: Sun, 31 Jul 2022 20:28:55 +0800 Subject: [PATCH 8/8] move create backoffer in try block Signed-off-by: iosmanthus --- src/main/java/org/tikv/common/PDClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/tikv/common/PDClient.java b/src/main/java/org/tikv/common/PDClient.java index 6180e1afb86..1568a78e0ef 100644 --- a/src/main/java/org/tikv/common/PDClient.java +++ b/src/main/java/org/tikv/common/PDClient.java @@ -523,8 +523,8 @@ synchronized boolean createFollowerClientWrapper( public void tryUpdateLeaderOrForwardFollower() { if (updateLeaderNotify.compareAndSet(false, true)) { - BackOffer backOffer = ConcreteBackOffer.newCustomBackOff(BackOffer.PD_INFO_BACKOFF); try { + BackOffer backOffer = defaultBackOffer(); updateLeaderService.submit( () -> { try {