From 4f14503e46e23f11a001ce2516420ae626288f91 Mon Sep 17 00:00:00 2001 From: Peng Guanwen Date: Thu, 16 Sep 2021 19:33:23 +0800 Subject: [PATCH 01/11] PDClient: Add function to call pause checker API Signed-off-by: Peng Guanwen --- pom.xml | 5 ++ src/main/java/org/tikv/common/PDChecker.java | 31 +++++++++++ src/main/java/org/tikv/common/PDClient.java | 53 +++++++++++++++++++ .../java/org/tikv/common/PDClientTest.java | 9 ++++ 4 files changed, 98 insertions(+) create mode 100644 src/main/java/org/tikv/common/PDChecker.java diff --git a/pom.xml b/pom.xml index 09b8374f63b..964c1bf2543 100644 --- a/pom.xml +++ b/pom.xml @@ -196,6 +196,11 @@ 3.9 compile + + org.apache.httpcomponents + httpclient + 4.5.13 + io.prometheus simpleclient diff --git a/src/main/java/org/tikv/common/PDChecker.java b/src/main/java/org/tikv/common/PDChecker.java new file mode 100644 index 00000000000..a64da7100d0 --- /dev/null +++ b/src/main/java/org/tikv/common/PDChecker.java @@ -0,0 +1,31 @@ +package org.tikv.common; + +public enum PDChecker { + Learner, + Replica, + Rule, + Split, + Merge, + JointState, + Priority; + + public String apiName() { + switch (this) { + case Learner: + return "learner"; + case Replica: + return "replica"; + case Rule: + return "rule"; + case Split: + return "split"; + case Merge: + return "merge"; + case JointState: + return "joint-state"; + case Priority: + return "priority"; + } + throw new IllegalArgumentException(); + } +} diff --git a/src/main/java/org/tikv/common/PDClient.java b/src/main/java/org/tikv/common/PDClient.java index e89f1b51fa2..9f114a63755 100644 --- a/src/main/java/org/tikv/common/PDClient.java +++ b/src/main/java/org/tikv/common/PDClient.java @@ -21,6 +21,7 @@ import static org.tikv.common.pd.PDUtils.addrToUri; import static org.tikv.common.pd.PDUtils.uriToAddr; +import com.fasterxml.jackson.databind.json.JsonMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.protobuf.ByteString; @@ -35,6 +36,7 @@ import io.prometheus.client.Histogram; import java.net.URI; import java.nio.charset.StandardCharsets; +import java.util.HashMap; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -46,6 +48,11 @@ import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import java.util.stream.Collectors; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.ByteArrayEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.tikv.common.codec.Codec.BytesCodec; @@ -93,12 +100,15 @@ public class PDClient extends AbstractGRPCClient implements ReadOnlyPDClient { private static final String TIFLASH_TABLE_SYNC_PROGRESS_PATH = "/tiflash/table/sync"; private static final long MIN_TRY_UPDATE_DURATION = 50; + private static final int PAUSE_CHECKER_TIMEOUT = 600; + private static final int KEEP_CHECKER_PAUSE_PERIOD = PAUSE_CHECKER_TIMEOUT / 5; private final Logger logger = LoggerFactory.getLogger(PDClient.class); private RequestHeader header; private TsoRequest tsoReq; private volatile PDClientWrapper pdClientWrapper; private ScheduledExecutorService service; private ScheduledExecutorService tiflashReplicaService; + private HashMap pauseCheckerService; private List pdAddrs; private Client etcdClient; private ConcurrentMap tiflashReplicaMap; @@ -144,6 +154,49 @@ public TiTimestamp getTimestamp(BackOffer backOffer) { return new TiTimestamp(timestamp.getPhysical(), timestamp.getLogical()); } + public void keepPauseChecker(PDChecker checker) { + if (!this.pauseCheckerService.containsKey(checker)) { + ScheduledExecutorService newService = + Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder() + .setNameFormat(String.format("PDClient-pause-%s-pool-%%d", checker.name())) + .setDaemon(true) + .build()); + this.pauseCheckerService.put(checker, newService); + } + this.pauseCheckerService + .get(checker) + .scheduleAtFixedRate( + () -> pauseChecker(checker), 0, KEEP_CHECKER_PAUSE_PERIOD, TimeUnit.SECONDS); + } + + public void stopKeepPauseChecker(PDChecker checker) { + if (this.pauseCheckerService.containsKey(checker)) { + this.pauseCheckerService.get(checker).shutdown(); + } + } + + private void pauseChecker(PDChecker checker) { + URI url = pdAddrs.get(0); + String api = url.toString() + "/pd/api/v1/checker/" + checker.apiName(); + JsonMapper jsonMapper = new JsonMapper(); + HashMap arguments = new HashMap<>(); + arguments.put("delay", PAUSE_CHECKER_TIMEOUT); + try (CloseableHttpClient client = HttpClients.createDefault()) { + byte[] body = jsonMapper.writeValueAsBytes(arguments); + HttpPost post = new HttpPost(api); + post.setEntity(new ByteArrayEntity(body)); + try (CloseableHttpResponse resp = client.execute(post)) { + if (resp.getStatusLine().getStatusCode() != 200) { + logger.error("failed to pause checker."); + } + logger.info("checker {} paused", checker.apiName()); + } + } catch (Exception e) { + logger.error("failed to pause checker.", e); + } + } + /** * Sends request to pd to scatter region. * diff --git a/src/test/java/org/tikv/common/PDClientTest.java b/src/test/java/org/tikv/common/PDClientTest.java index d26f09e436d..26018e6666f 100644 --- a/src/test/java/org/tikv/common/PDClientTest.java +++ b/src/test/java/org/tikv/common/PDClientTest.java @@ -68,6 +68,15 @@ public void testTso() throws Exception { } } + @Test + public void testPauseCheck() throws Exception { + try (PDClient client = session.getPDClient()) { + client.keepPauseChecker(PDChecker.Merge); + Thread.sleep(6000); + client.stopKeepPauseChecker(PDChecker.Merge); + } + } + @Test public void testGetRegionByKey() throws Exception { byte[] startKey = new byte[] {1, 0, 2, 4}; From 9e26995a2c94e9179aeedd5a9bdfb350642acbb7 Mon Sep 17 00:00:00 2001 From: Peng Guanwen Date: Thu, 16 Sep 2021 20:26:29 +0800 Subject: [PATCH 02/11] bugfix Signed-off-by: Peng Guanwen --- src/main/java/org/tikv/common/PDClient.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/tikv/common/PDClient.java b/src/main/java/org/tikv/common/PDClient.java index 9f114a63755..46c10cf2e57 100644 --- a/src/main/java/org/tikv/common/PDClient.java +++ b/src/main/java/org/tikv/common/PDClient.java @@ -100,15 +100,15 @@ public class PDClient extends AbstractGRPCClient implements ReadOnlyPDClient { private static final String TIFLASH_TABLE_SYNC_PROGRESS_PATH = "/tiflash/table/sync"; private static final long MIN_TRY_UPDATE_DURATION = 50; - private static final int PAUSE_CHECKER_TIMEOUT = 600; - private static final int KEEP_CHECKER_PAUSE_PERIOD = PAUSE_CHECKER_TIMEOUT / 5; + private static final int PAUSE_CHECKER_TIMEOUT = 300; // in seconds + private static final int KEEP_CHECKER_PAUSE_PERIOD = PAUSE_CHECKER_TIMEOUT / 5; // in seconds private final Logger logger = LoggerFactory.getLogger(PDClient.class); private RequestHeader header; private TsoRequest tsoReq; private volatile PDClientWrapper pdClientWrapper; private ScheduledExecutorService service; private ScheduledExecutorService tiflashReplicaService; - private HashMap pauseCheckerService; + private final HashMap pauseCheckerService = new HashMap<>(); private List pdAddrs; private Client etcdClient; private ConcurrentMap tiflashReplicaMap; From 83d74d4a8c4ece0330e7de2116898cb650a503d7 Mon Sep 17 00:00:00 2001 From: Peng Guanwen Date: Fri, 17 Sep 2021 10:13:44 +0800 Subject: [PATCH 03/11] Add resumeChecker Signed-off-by: Peng Guanwen --- src/main/java/org/tikv/common/PDClient.java | 20 +++++++++++++------ .../java/org/tikv/common/PDClientTest.java | 9 ++++++--- 2 files changed, 20 insertions(+), 9 deletions(-) diff --git a/src/main/java/org/tikv/common/PDClient.java b/src/main/java/org/tikv/common/PDClient.java index 46c10cf2e57..ff696f6de47 100644 --- a/src/main/java/org/tikv/common/PDClient.java +++ b/src/main/java/org/tikv/common/PDClient.java @@ -167,7 +167,10 @@ public void keepPauseChecker(PDChecker checker) { this.pauseCheckerService .get(checker) .scheduleAtFixedRate( - () -> pauseChecker(checker), 0, KEEP_CHECKER_PAUSE_PERIOD, TimeUnit.SECONDS); + () -> pauseChecker(checker, PAUSE_CHECKER_TIMEOUT), + 0, + KEEP_CHECKER_PAUSE_PERIOD, + TimeUnit.SECONDS); } public void stopKeepPauseChecker(PDChecker checker) { @@ -176,24 +179,29 @@ public void stopKeepPauseChecker(PDChecker checker) { } } - private void pauseChecker(PDChecker checker) { + public void resumeChecker(PDChecker checker) { + pauseChecker(checker, 0); + } + + private void pauseChecker(PDChecker checker, int timeout) { + String verb = timeout == 0 ? "resume" : "pause"; URI url = pdAddrs.get(0); String api = url.toString() + "/pd/api/v1/checker/" + checker.apiName(); JsonMapper jsonMapper = new JsonMapper(); HashMap arguments = new HashMap<>(); - arguments.put("delay", PAUSE_CHECKER_TIMEOUT); + arguments.put("delay", timeout); try (CloseableHttpClient client = HttpClients.createDefault()) { byte[] body = jsonMapper.writeValueAsBytes(arguments); HttpPost post = new HttpPost(api); post.setEntity(new ByteArrayEntity(body)); try (CloseableHttpResponse resp = client.execute(post)) { if (resp.getStatusLine().getStatusCode() != 200) { - logger.error("failed to pause checker."); + logger.error("failed to {} checker.", verb); } - logger.info("checker {} paused", checker.apiName()); + logger.info("checker {} {}d", checker.apiName(), verb); } } catch (Exception e) { - logger.error("failed to pause checker.", e); + logger.error(String.format("failed to %s checker.", verb), e); } } diff --git a/src/test/java/org/tikv/common/PDClientTest.java b/src/test/java/org/tikv/common/PDClientTest.java index 26018e6666f..f030978e38d 100644 --- a/src/test/java/org/tikv/common/PDClientTest.java +++ b/src/test/java/org/tikv/common/PDClientTest.java @@ -71,9 +71,12 @@ public void testTso() throws Exception { @Test public void testPauseCheck() throws Exception { try (PDClient client = session.getPDClient()) { - client.keepPauseChecker(PDChecker.Merge); - Thread.sleep(6000); - client.stopKeepPauseChecker(PDChecker.Merge); + PDChecker checker = PDChecker.Merge; + client.keepPauseChecker(checker); + Thread.sleep(1000); + client.stopKeepPauseChecker(checker); + Thread.sleep(1000); + client.resumeChecker(checker); } } From 4415fcb9e2ac97422f45f530ac614e8319666833 Mon Sep 17 00:00:00 2001 From: Peng Guanwen Date: Fri, 17 Sep 2021 12:51:13 +0800 Subject: [PATCH 04/11] Add isCheckerPaused Signed-off-by: Peng Guanwen --- src/main/java/org/tikv/common/PDClient.java | 19 ++++++++++++++++++- .../java/org/tikv/common/PDClientTest.java | 4 ++++ 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/tikv/common/PDClient.java b/src/main/java/org/tikv/common/PDClient.java index ff696f6de47..4fbdf0d94fd 100644 --- a/src/main/java/org/tikv/common/PDClient.java +++ b/src/main/java/org/tikv/common/PDClient.java @@ -21,6 +21,8 @@ import static org.tikv.common.pd.PDUtils.addrToUri; import static org.tikv.common.pd.PDUtils.uriToAddr; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.json.JsonMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -35,6 +37,7 @@ import io.grpc.stub.MetadataUtils; import io.prometheus.client.Histogram; import java.net.URI; +import java.net.URL; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.List; @@ -187,10 +190,10 @@ private void pauseChecker(PDChecker checker, int timeout) { String verb = timeout == 0 ? "resume" : "pause"; URI url = pdAddrs.get(0); String api = url.toString() + "/pd/api/v1/checker/" + checker.apiName(); - JsonMapper jsonMapper = new JsonMapper(); HashMap arguments = new HashMap<>(); arguments.put("delay", timeout); try (CloseableHttpClient client = HttpClients.createDefault()) { + JsonMapper jsonMapper = new JsonMapper(); byte[] body = jsonMapper.writeValueAsBytes(arguments); HttpPost post = new HttpPost(api); post.setEntity(new ByteArrayEntity(body)); @@ -205,6 +208,20 @@ private void pauseChecker(PDChecker checker, int timeout) { } } + public Boolean isCheckerPaused(PDChecker checker) { + URI url = pdAddrs.get(0); + String api = url.toString() + "/pd/api/v1/checker/" + checker.apiName(); + try { + ObjectMapper mapper = new ObjectMapper(); + HashMap status = + mapper.readValue(new URL(api), new TypeReference>() {}); + return status.get("paused"); + } catch (Exception e) { + logger.error(String.format("failed to get %s checker status.", checker.apiName()), e); + return null; + } + } + /** * Sends request to pd to scatter region. * diff --git a/src/test/java/org/tikv/common/PDClientTest.java b/src/test/java/org/tikv/common/PDClientTest.java index f030978e38d..40e42e91078 100644 --- a/src/test/java/org/tikv/common/PDClientTest.java +++ b/src/test/java/org/tikv/common/PDClientTest.java @@ -74,9 +74,13 @@ public void testPauseCheck() throws Exception { PDChecker checker = PDChecker.Merge; client.keepPauseChecker(checker); Thread.sleep(1000); + assertTrue(client.isCheckerPaused(checker)); + client.stopKeepPauseChecker(checker); Thread.sleep(1000); + client.resumeChecker(checker); + assertFalse(client.isCheckerPaused(checker)); } } From 2e46d07199ef30a1fc72c9e6821af1c1b6983930 Mon Sep 17 00:00:00 2001 From: Peng Guanwen Date: Fri, 17 Sep 2021 16:14:28 +0800 Subject: [PATCH 05/11] Refine test Co-authored-by: Liangliang Gu Signed-off-by: Peng Guanwen --- .../java/org/tikv/common/PDClientTest.java | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/src/test/java/org/tikv/common/PDClientTest.java b/src/test/java/org/tikv/common/PDClientTest.java index 40e42e91078..bd993f8fca8 100644 --- a/src/test/java/org/tikv/common/PDClientTest.java +++ b/src/test/java/org/tikv/common/PDClientTest.java @@ -72,15 +72,17 @@ public void testTso() throws Exception { public void testPauseCheck() throws Exception { try (PDClient client = session.getPDClient()) { PDChecker checker = PDChecker.Merge; - client.keepPauseChecker(checker); - Thread.sleep(1000); - assertTrue(client.isCheckerPaused(checker)); - - client.stopKeepPauseChecker(checker); - Thread.sleep(1000); - - client.resumeChecker(checker); - assertFalse(client.isCheckerPaused(checker)); + for(int i = 0; i < 2; i ++) { + client.keepPauseChecker(checker); + Thread.sleep(1000); + assertTrue(client.isCheckerPaused(checker)); + + client.stopKeepPauseChecker(checker); + Thread.sleep(1000); + + client.resumeChecker(checker); + assertFalse(client.isCheckerPaused(checker)); + } } } From 34bad19c8870b62e76212d5a1c831bda6015c3bf Mon Sep 17 00:00:00 2001 From: Peng Guanwen Date: Wed, 22 Sep 2021 09:47:34 +0800 Subject: [PATCH 06/11] Reformat code Signed-off-by: Peng Guanwen --- src/test/java/org/tikv/common/PDClientTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/test/java/org/tikv/common/PDClientTest.java b/src/test/java/org/tikv/common/PDClientTest.java index bd993f8fca8..210c7a19ebb 100644 --- a/src/test/java/org/tikv/common/PDClientTest.java +++ b/src/test/java/org/tikv/common/PDClientTest.java @@ -72,14 +72,14 @@ public void testTso() throws Exception { public void testPauseCheck() throws Exception { try (PDClient client = session.getPDClient()) { PDChecker checker = PDChecker.Merge; - for(int i = 0; i < 2; i ++) { + for (int i = 0; i < 2; i++) { client.keepPauseChecker(checker); Thread.sleep(1000); assertTrue(client.isCheckerPaused(checker)); - + client.stopKeepPauseChecker(checker); Thread.sleep(1000); - + client.resumeChecker(checker); assertFalse(client.isCheckerPaused(checker)); } From 430654bbcd55ab89dc61e5f3c8974b2e0a809f00 Mon Sep 17 00:00:00 2001 From: Peng Guanwen Date: Fri, 24 Sep 2021 16:31:17 +0800 Subject: [PATCH 07/11] Remove scheduler after shutdown Signed-off-by: Peng Guanwen --- src/main/java/org/tikv/common/PDClient.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/main/java/org/tikv/common/PDClient.java b/src/main/java/org/tikv/common/PDClient.java index 4fbdf0d94fd..10063a92ee3 100644 --- a/src/main/java/org/tikv/common/PDClient.java +++ b/src/main/java/org/tikv/common/PDClient.java @@ -165,20 +165,20 @@ public void keepPauseChecker(PDChecker checker) { .setNameFormat(String.format("PDClient-pause-%s-pool-%%d", checker.name())) .setDaemon(true) .build()); + newService + .scheduleAtFixedRate( + () -> pauseChecker(checker, PAUSE_CHECKER_TIMEOUT), + 0, + KEEP_CHECKER_PAUSE_PERIOD, + TimeUnit.SECONDS); this.pauseCheckerService.put(checker, newService); } - this.pauseCheckerService - .get(checker) - .scheduleAtFixedRate( - () -> pauseChecker(checker, PAUSE_CHECKER_TIMEOUT), - 0, - KEEP_CHECKER_PAUSE_PERIOD, - TimeUnit.SECONDS); } public void stopKeepPauseChecker(PDChecker checker) { if (this.pauseCheckerService.containsKey(checker)) { this.pauseCheckerService.get(checker).shutdown(); + this.pauseCheckerService.remove(checker); } } From 6565d3850901b63352ada0f60719f588a4f13863 Mon Sep 17 00:00:00 2001 From: Peng Guanwen Date: Fri, 24 Sep 2021 16:40:04 +0800 Subject: [PATCH 08/11] Reformat code Signed-off-by: Peng Guanwen --- src/main/java/org/tikv/common/PDClient.java | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/src/main/java/org/tikv/common/PDClient.java b/src/main/java/org/tikv/common/PDClient.java index 10063a92ee3..56e2913124f 100644 --- a/src/main/java/org/tikv/common/PDClient.java +++ b/src/main/java/org/tikv/common/PDClient.java @@ -165,12 +165,11 @@ public void keepPauseChecker(PDChecker checker) { .setNameFormat(String.format("PDClient-pause-%s-pool-%%d", checker.name())) .setDaemon(true) .build()); - newService - .scheduleAtFixedRate( - () -> pauseChecker(checker, PAUSE_CHECKER_TIMEOUT), - 0, - KEEP_CHECKER_PAUSE_PERIOD, - TimeUnit.SECONDS); + newService.scheduleAtFixedRate( + () -> pauseChecker(checker, PAUSE_CHECKER_TIMEOUT), + 0, + KEEP_CHECKER_PAUSE_PERIOD, + TimeUnit.SECONDS); this.pauseCheckerService.put(checker, newService); } } From 7cff0dafa09de93933b977136fef4b4f22932a32 Mon Sep 17 00:00:00 2001 From: Peng Guanwen Date: Tue, 28 Sep 2021 10:50:52 +0800 Subject: [PATCH 09/11] move pause test to integration test Signed-off-by: Peng Guanwen --- .../tikv/common/PDClientIntegrationTest.java | 44 +++++++++++++++++++ .../java/org/tikv/common/PDClientTest.java | 19 -------- 2 files changed, 44 insertions(+), 19 deletions(-) create mode 100644 src/test/java/org/tikv/common/PDClientIntegrationTest.java diff --git a/src/test/java/org/tikv/common/PDClientIntegrationTest.java b/src/test/java/org/tikv/common/PDClientIntegrationTest.java new file mode 100644 index 00000000000..18d1381ee0d --- /dev/null +++ b/src/test/java/org/tikv/common/PDClientIntegrationTest.java @@ -0,0 +1,44 @@ +package org.tikv.common; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class PDClientIntegrationTest { + private TiSession session; + + @Before + public void setup() { + TiConfiguration conf = TiConfiguration.createRawDefault(); + conf.setTest(true); + session = TiSession.create(conf); + } + + @After + public void tearDown() throws Exception { + if (session != null) { + session.close(); + } + } + + @Test + public void testPauseCheck() throws Exception { + try (PDClient client = session.getPDClient()) { + PDChecker checker = PDChecker.Merge; + for (int i = 0; i < 2; i++) { + client.keepPauseChecker(checker); + Thread.sleep(1000); + assertTrue(client.isCheckerPaused(checker)); + + client.stopKeepPauseChecker(checker); + Thread.sleep(1000); + + client.resumeChecker(checker); + assertFalse(client.isCheckerPaused(checker)); + } + } + } +} diff --git a/src/test/java/org/tikv/common/PDClientTest.java b/src/test/java/org/tikv/common/PDClientTest.java index 7e18ecac965..f22d646564f 100644 --- a/src/test/java/org/tikv/common/PDClientTest.java +++ b/src/test/java/org/tikv/common/PDClientTest.java @@ -16,7 +16,6 @@ package org.tikv.common; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.tikv.common.GrpcUtils.encodeKey; @@ -78,24 +77,6 @@ public void testTso() throws Exception { } } - @Test - public void testPauseCheck() throws Exception { - try (PDClient client = session.getPDClient()) { - PDChecker checker = PDChecker.Merge; - for (int i = 0; i < 2; i++) { - client.keepPauseChecker(checker); - Thread.sleep(1000); - assertTrue(client.isCheckerPaused(checker)); - - client.stopKeepPauseChecker(checker); - Thread.sleep(1000); - - client.resumeChecker(checker); - assertFalse(client.isCheckerPaused(checker)); - } - } - } - @Test public void testGetRegionByKey() throws Exception { byte[] startKey = new byte[] {1, 0, 2, 4}; From 85479ef1dabc18d4cd15d2de287680cc2bca5ba5 Mon Sep 17 00:00:00 2001 From: Peng Guanwen Date: Tue, 28 Sep 2021 10:55:02 +0800 Subject: [PATCH 10/11] Reformat code Signed-off-by: Peng Guanwen --- src/test/java/org/tikv/common/PDClientIntegrationTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/test/java/org/tikv/common/PDClientIntegrationTest.java b/src/test/java/org/tikv/common/PDClientIntegrationTest.java index 18d1381ee0d..e76ab08cb5d 100644 --- a/src/test/java/org/tikv/common/PDClientIntegrationTest.java +++ b/src/test/java/org/tikv/common/PDClientIntegrationTest.java @@ -1,12 +1,12 @@ package org.tikv.common; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + import org.junit.After; import org.junit.Before; import org.junit.Test; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - public class PDClientIntegrationTest { private TiSession session; From a9733280ee8fd9feb587c183289af8e613fd6abc Mon Sep 17 00:00:00 2001 From: Peng Guanwen Date: Tue, 28 Sep 2021 11:24:13 +0800 Subject: [PATCH 11/11] Add synchronized to keepPauseChecker Signed-off-by: Peng Guanwen --- src/main/java/org/tikv/common/PDClient.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/tikv/common/PDClient.java b/src/main/java/org/tikv/common/PDClient.java index 9dd34cc8670..d86363f3c66 100644 --- a/src/main/java/org/tikv/common/PDClient.java +++ b/src/main/java/org/tikv/common/PDClient.java @@ -157,7 +157,7 @@ public TiTimestamp getTimestamp(BackOffer backOffer) { return new TiTimestamp(timestamp.getPhysical(), timestamp.getLogical()); } - public void keepPauseChecker(PDChecker checker) { + public synchronized void keepPauseChecker(PDChecker checker) { if (!this.pauseCheckerService.containsKey(checker)) { ScheduledExecutorService newService = Executors.newSingleThreadScheduledExecutor( @@ -174,7 +174,7 @@ public void keepPauseChecker(PDChecker checker) { } } - public void stopKeepPauseChecker(PDChecker checker) { + public synchronized void stopKeepPauseChecker(PDChecker checker) { if (this.pauseCheckerService.containsKey(checker)) { this.pauseCheckerService.get(checker).shutdown(); this.pauseCheckerService.remove(checker);