From 3d28c60b69dd21f1eb3212c1d722f4affc8ef761 Mon Sep 17 00:00:00 2001 From: iosmanthus Date: Wed, 27 Jul 2022 16:26:14 +0800 Subject: [PATCH 1/5] constraint the getMember timeout by inject a backoffer Signed-off-by: iosmanthus --- src/main/java/org/tikv/common/PDClient.java | 17 +++++++++++------ .../tikv/common/operation/PDErrorHandler.java | 4 ++-- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/src/main/java/org/tikv/common/PDClient.java b/src/main/java/org/tikv/common/PDClient.java index cfafbff4550..e2530534807 100644 --- a/src/main/java/org/tikv/common/PDClient.java +++ b/src/main/java/org/tikv/common/PDClient.java @@ -440,6 +440,8 @@ PDClientWrapper getPdClientWrapper() { private GetMembersResponse doGetMembers(BackOffer backOffer, URI uri) { while (true) { + backOffer.checkTimeout(); + try { ManagedChannel probChan = channelFactory.getChannel(uriToAddr(uri), hostMapping); PDGrpc.PDBlockingStub stub = @@ -459,8 +461,7 @@ private GetMembersResponse doGetMembers(BackOffer backOffer, URI uri) { } } - private GetMembersResponse getMembers(URI uri) { - BackOffer backOffer = ConcreteBackOffer.newCustomBackOff(BackOffer.PD_INFO_BACKOFF); + private GetMembersResponse getMembers(BackOffer backOffer, URI uri) { try { return doGetMembers(backOffer, uri); } catch (Exception e) { @@ -516,13 +517,13 @@ synchronized boolean createFollowerClientWrapper(String followerUrlStr, String l return true; } - public synchronized void updateLeaderOrForwardFollower() { + public synchronized void updateLeaderOrForwardFollower(BackOffer backOffer) { if (System.currentTimeMillis() - lastUpdateLeaderTime < MIN_TRY_UPDATE_DURATION) { return; } for (URI url : this.pdAddrs) { // since resp is null, we need update leader's address by walking through all pd server. - GetMembersResponse resp = getMembers(url); + GetMembersResponse resp = getMembers(backOffer, url); if (resp == null) { continue; } @@ -578,7 +579,7 @@ public synchronized void updateLeaderOrForwardFollower() { public void tryUpdateLeader() { for (URI url : this.pdAddrs) { // since resp is null, we need update leader's address by walking through all pd server. - GetMembersResponse resp = getMembers(url); + GetMembersResponse resp = getMembers(defaultBackOffer(), url); if (resp == null) { continue; } @@ -705,7 +706,7 @@ private void initCluster() { this.timeout = conf.getPdFirstGetMemberTimeout(); for (URI u : pdAddrs) { logger.info("get members with pd " + u + ": start"); - resp = getMembers(u); + resp = getMembers(defaultBackOffer(), u); logger.info("get members with pd " + u + ": end"); if (resp != null) { break; @@ -825,4 +826,8 @@ public List getPdAddrs() { public RequestKeyCodec getCodec() { return codec; } + + private static BackOffer defaultBackOffer() { + return ConcreteBackOffer.newCustomBackOff(BackOffer.PD_INFO_BACKOFF); + } } diff --git a/src/main/java/org/tikv/common/operation/PDErrorHandler.java b/src/main/java/org/tikv/common/operation/PDErrorHandler.java index df403789503..4f9cc7fbabb 100644 --- a/src/main/java/org/tikv/common/operation/PDErrorHandler.java +++ b/src/main/java/org/tikv/common/operation/PDErrorHandler.java @@ -59,7 +59,7 @@ public boolean handleResponseError(BackOffer backOffer, RespT resp) { case PD_ERROR: backOffer.doBackOff( BackOffFunction.BackOffFuncType.BoPDRPC, new GrpcException(error.toString())); - client.updateLeaderOrForwardFollower(); + client.updateLeaderOrForwardFollower(backOffer); return true; case REGION_PEER_NOT_ELECTED: logger.debug(error.getMessage()); @@ -80,7 +80,7 @@ public boolean handleRequestError(BackOffer backOffer, Exception e) { return false; } backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoPDRPC, e); - client.updateLeaderOrForwardFollower(); + client.updateLeaderOrForwardFollower(backOffer); return true; } } From 7e5da2ddea87de2a9da7e8a80a382746be8759fe Mon Sep 17 00:00:00 2001 From: iosmanthus Date: Wed, 27 Jul 2022 17:03:17 +0800 Subject: [PATCH 2/5] fix checkHealth unlimited retry Signed-off-by: iosmanthus --- .../org/tikv/common/AbstractGRPCClient.java | 3 +- src/main/java/org/tikv/common/PDClient.java | 10 ++-- .../org/tikv/common/PDMockServerTest.java | 11 ++++ .../java/org/tikv/common/TimeoutTest.java | 59 +++++++++++++++++++ 4 files changed, 76 insertions(+), 7 deletions(-) create mode 100644 src/test/java/org/tikv/common/TimeoutTest.java diff --git a/src/main/java/org/tikv/common/AbstractGRPCClient.java b/src/main/java/org/tikv/common/AbstractGRPCClient.java index 616bcfbaebe..e23ae9be9d8 100644 --- a/src/main/java/org/tikv/common/AbstractGRPCClient.java +++ b/src/main/java/org/tikv/common/AbstractGRPCClient.java @@ -198,8 +198,7 @@ private boolean doCheckHealth(BackOffer backOffer, String addressStr, HostMappin } } - protected boolean checkHealth(String addressStr, HostMapping hostMapping) { - BackOffer backOffer = ConcreteBackOffer.newCustomBackOff((int) (timeout * 2)); + protected boolean checkHealth(BackOffer backOffer, String addressStr, HostMapping hostMapping) { try { return doCheckHealth(backOffer, addressStr, hostMapping); } catch (Exception e) { diff --git a/src/main/java/org/tikv/common/PDClient.java b/src/main/java/org/tikv/common/PDClient.java index e2530534807..384741d3d56 100644 --- a/src/main/java/org/tikv/common/PDClient.java +++ b/src/main/java/org/tikv/common/PDClient.java @@ -498,11 +498,11 @@ private synchronized boolean createLeaderClientWrapper(String leaderUrlStr) { return true; } - synchronized boolean createFollowerClientWrapper(String followerUrlStr, String leaderUrls) { + synchronized boolean createFollowerClientWrapper(BackOffer backOffer, String followerUrlStr, String leaderUrls) { // TODO: Why not strip protocol info on server side since grpc does not need it try { - if (!checkHealth(followerUrlStr, hostMapping)) { + if (!checkHealth(backOffer, followerUrlStr, hostMapping)) { return false; } @@ -535,7 +535,7 @@ public synchronized void updateLeaderOrForwardFollower(BackOffer backOffer) { leaderUrlStr = uriToAddr(addrToUri(leaderUrlStr)); // if leader is switched, just return. - if (checkHealth(leaderUrlStr, hostMapping) && createLeaderClientWrapper(leaderUrlStr)) { + if (checkHealth(backOffer, leaderUrlStr, hostMapping) && createLeaderClientWrapper(leaderUrlStr)) { lastUpdateLeaderTime = System.currentTimeMillis(); return; } @@ -562,7 +562,7 @@ public synchronized void updateLeaderOrForwardFollower(BackOffer backOffer) { hasReachNextMember = true; continue; } - if (hasReachNextMember && createFollowerClientWrapper(followerUrlStr, leaderUrlStr)) { + if (hasReachNextMember && createFollowerClientWrapper(backOffer, followerUrlStr, leaderUrlStr)) { logger.warn( String.format("forward request to pd [%s] by pd [%s]", leaderUrlStr, followerUrlStr)); return; @@ -592,7 +592,7 @@ public void tryUpdateLeader() { leaderUrlStr = uriToAddr(addrToUri(leaderUrlStr)); // If leader is not change but becomes available, we can cancel follower forward. - if (checkHealth(leaderUrlStr, hostMapping) && trySwitchLeader(leaderUrlStr)) { + if (checkHealth(defaultBackOffer(), leaderUrlStr, hostMapping) && trySwitchLeader(leaderUrlStr)) { if (!urls.equals(this.pdAddrs)) { tryUpdateMembers(urls); } diff --git a/src/test/java/org/tikv/common/PDMockServerTest.java b/src/test/java/org/tikv/common/PDMockServerTest.java index 08018d0d495..edd3a19f444 100644 --- a/src/test/java/org/tikv/common/PDMockServerTest.java +++ b/src/test/java/org/tikv/common/PDMockServerTest.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.function.Function; import org.junit.After; import org.junit.Before; import org.tikv.common.TiConfiguration.ApiVersion; @@ -51,6 +52,16 @@ void upgradeToV2Cluster() throws Exception { session = TiSession.create(conf); } + void updateConf(Function update) throws Exception { + if (session == null) { + throw new IllegalStateException("Cluster is not initialized"); + } + + session.close(); + + session = TiSession.create(update.apply(session.getConf())); + } + void setup(String addr) throws IOException { int[] ports = new int[3]; for (int i = 0; i < ports.length; i++) { diff --git a/src/test/java/org/tikv/common/TimeoutTest.java b/src/test/java/org/tikv/common/TimeoutTest.java new file mode 100644 index 00000000000..0ebb91d6a2e --- /dev/null +++ b/src/test/java/org/tikv/common/TimeoutTest.java @@ -0,0 +1,59 @@ +/* + * Copyright 2022 TiKV Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.tikv.common; + +import com.google.protobuf.ByteString; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.tikv.raw.RawKVClient; + +public class TimeoutTest extends MockThreeStoresTest { + @Before + public void init() throws Exception { + updateConf(conf -> { + conf.setEnableAtomicForCAS(true); + conf.setTimeout(150); + conf.setForwardTimeout(200); + conf.setRawKVReadTimeoutInMS(400); + conf.setRawKVWriteTimeoutInMS(400); + conf.setRawKVBatchReadTimeoutInMS(400); + conf.setRawKVBatchWriteTimeoutInMS(400); + conf.setRawKVWriteSlowLogInMS(50); + conf.setRawKVReadSlowLogInMS(50); + conf.setRawKVBatchReadSlowLogInMS(50); + conf.setRawKVBatchWriteSlowLogInMS(50); + return conf; + }); + } + + private RawKVClient createClient() { + return session.createRawClient(); + } + + @Test + public void testTimeoutInTime() { + try (RawKVClient client = createClient()) { + pdServers.get(0).stop(); + long start = System.currentTimeMillis(); + client.get(ByteString.copyFromUtf8("key")); + long end = System.currentTimeMillis(); + Assert.assertTrue(end - start < 500); + } + } +} From c839b54abb071ca2ca60e2863d99d2dc067f58aa Mon Sep 17 00:00:00 2001 From: iosmanthus Date: Wed, 27 Jul 2022 17:03:58 +0800 Subject: [PATCH 3/5] ./dev/javafmt Signed-off-by: iosmanthus --- .../org/tikv/common/AbstractGRPCClient.java | 1 - src/main/java/org/tikv/common/PDClient.java | 12 +++++--- .../java/org/tikv/common/TimeoutTest.java | 29 ++++++++++--------- 3 files changed, 23 insertions(+), 19 deletions(-) diff --git a/src/main/java/org/tikv/common/AbstractGRPCClient.java b/src/main/java/org/tikv/common/AbstractGRPCClient.java index e23ae9be9d8..d2675f94385 100644 --- a/src/main/java/org/tikv/common/AbstractGRPCClient.java +++ b/src/main/java/org/tikv/common/AbstractGRPCClient.java @@ -40,7 +40,6 @@ import org.tikv.common.util.BackOffFunction.BackOffFuncType; import org.tikv.common.util.BackOffer; import org.tikv.common.util.ChannelFactory; -import org.tikv.common.util.ConcreteBackOffer; public abstract class AbstractGRPCClient< BlockingStubT extends AbstractStub, diff --git a/src/main/java/org/tikv/common/PDClient.java b/src/main/java/org/tikv/common/PDClient.java index 384741d3d56..5a4ebd94062 100644 --- a/src/main/java/org/tikv/common/PDClient.java +++ b/src/main/java/org/tikv/common/PDClient.java @@ -498,7 +498,8 @@ private synchronized boolean createLeaderClientWrapper(String leaderUrlStr) { return true; } - synchronized boolean createFollowerClientWrapper(BackOffer backOffer, String followerUrlStr, String leaderUrls) { + synchronized boolean createFollowerClientWrapper( + BackOffer backOffer, String followerUrlStr, String leaderUrls) { // TODO: Why not strip protocol info on server side since grpc does not need it try { @@ -535,7 +536,8 @@ public synchronized void updateLeaderOrForwardFollower(BackOffer backOffer) { leaderUrlStr = uriToAddr(addrToUri(leaderUrlStr)); // if leader is switched, just return. - if (checkHealth(backOffer, leaderUrlStr, hostMapping) && createLeaderClientWrapper(leaderUrlStr)) { + if (checkHealth(backOffer, leaderUrlStr, hostMapping) + && createLeaderClientWrapper(leaderUrlStr)) { lastUpdateLeaderTime = System.currentTimeMillis(); return; } @@ -562,7 +564,8 @@ public synchronized void updateLeaderOrForwardFollower(BackOffer backOffer) { hasReachNextMember = true; continue; } - if (hasReachNextMember && createFollowerClientWrapper(backOffer, followerUrlStr, leaderUrlStr)) { + if (hasReachNextMember + && createFollowerClientWrapper(backOffer, followerUrlStr, leaderUrlStr)) { logger.warn( String.format("forward request to pd [%s] by pd [%s]", leaderUrlStr, followerUrlStr)); return; @@ -592,7 +595,8 @@ public void tryUpdateLeader() { leaderUrlStr = uriToAddr(addrToUri(leaderUrlStr)); // If leader is not change but becomes available, we can cancel follower forward. - if (checkHealth(defaultBackOffer(), leaderUrlStr, hostMapping) && trySwitchLeader(leaderUrlStr)) { + if (checkHealth(defaultBackOffer(), leaderUrlStr, hostMapping) + && trySwitchLeader(leaderUrlStr)) { if (!urls.equals(this.pdAddrs)) { tryUpdateMembers(urls); } diff --git a/src/test/java/org/tikv/common/TimeoutTest.java b/src/test/java/org/tikv/common/TimeoutTest.java index 0ebb91d6a2e..056ef471547 100644 --- a/src/test/java/org/tikv/common/TimeoutTest.java +++ b/src/test/java/org/tikv/common/TimeoutTest.java @@ -26,20 +26,21 @@ public class TimeoutTest extends MockThreeStoresTest { @Before public void init() throws Exception { - updateConf(conf -> { - conf.setEnableAtomicForCAS(true); - conf.setTimeout(150); - conf.setForwardTimeout(200); - conf.setRawKVReadTimeoutInMS(400); - conf.setRawKVWriteTimeoutInMS(400); - conf.setRawKVBatchReadTimeoutInMS(400); - conf.setRawKVBatchWriteTimeoutInMS(400); - conf.setRawKVWriteSlowLogInMS(50); - conf.setRawKVReadSlowLogInMS(50); - conf.setRawKVBatchReadSlowLogInMS(50); - conf.setRawKVBatchWriteSlowLogInMS(50); - return conf; - }); + updateConf( + conf -> { + conf.setEnableAtomicForCAS(true); + conf.setTimeout(150); + conf.setForwardTimeout(200); + conf.setRawKVReadTimeoutInMS(400); + conf.setRawKVWriteTimeoutInMS(400); + conf.setRawKVBatchReadTimeoutInMS(400); + conf.setRawKVBatchWriteTimeoutInMS(400); + conf.setRawKVWriteSlowLogInMS(50); + conf.setRawKVReadSlowLogInMS(50); + conf.setRawKVBatchReadSlowLogInMS(50); + conf.setRawKVBatchWriteSlowLogInMS(50); + return conf; + }); } private RawKVClient createClient() { From 92494a7d2701a9b284a9746706705e1df81840ff Mon Sep 17 00:00:00 2001 From: iosmanthus Date: Wed, 27 Jul 2022 17:15:26 +0800 Subject: [PATCH 4/5] fix unstable test Signed-off-by: iosmanthus --- src/test/java/org/tikv/common/TimeoutTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/org/tikv/common/TimeoutTest.java b/src/test/java/org/tikv/common/TimeoutTest.java index 056ef471547..e0f3d01d1ce 100644 --- a/src/test/java/org/tikv/common/TimeoutTest.java +++ b/src/test/java/org/tikv/common/TimeoutTest.java @@ -54,7 +54,7 @@ public void testTimeoutInTime() { long start = System.currentTimeMillis(); client.get(ByteString.copyFromUtf8("key")); long end = System.currentTimeMillis(); - Assert.assertTrue(end - start < 500); + Assert.assertTrue(end - start < session.getConf().getRawKVReadTimeoutInMS() * 2L); } } } From 95f9b9261d33f00a98fd91f5c1fcdc9fdbed0957 Mon Sep 17 00:00:00 2001 From: iosmanthus Date: Wed, 27 Jul 2022 19:54:53 +0800 Subject: [PATCH 5/5] unify default backoffer in tryUpdateLeader Signed-off-by: iosmanthus --- src/main/java/org/tikv/common/PDClient.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/tikv/common/PDClient.java b/src/main/java/org/tikv/common/PDClient.java index 5a4ebd94062..695a1565b1c 100644 --- a/src/main/java/org/tikv/common/PDClient.java +++ b/src/main/java/org/tikv/common/PDClient.java @@ -581,8 +581,9 @@ && createFollowerClientWrapper(backOffer, followerUrlStr, leaderUrlStr)) { public void tryUpdateLeader() { for (URI url : this.pdAddrs) { + BackOffer backOffer = defaultBackOffer(); // since resp is null, we need update leader's address by walking through all pd server. - GetMembersResponse resp = getMembers(defaultBackOffer(), url); + GetMembersResponse resp = getMembers(backOffer, url); if (resp == null) { continue; } @@ -595,8 +596,7 @@ public void tryUpdateLeader() { leaderUrlStr = uriToAddr(addrToUri(leaderUrlStr)); // If leader is not change but becomes available, we can cancel follower forward. - if (checkHealth(defaultBackOffer(), leaderUrlStr, hostMapping) - && trySwitchLeader(leaderUrlStr)) { + if (checkHealth(backOffer, leaderUrlStr, hostMapping) && trySwitchLeader(leaderUrlStr)) { if (!urls.equals(this.pdAddrs)) { tryUpdateMembers(urls); }