diff --git a/src/main/java/org/tikv/common/importer/SwitchTiKVModeClient.java b/src/main/java/org/tikv/common/importer/SwitchTiKVModeClient.java index e230217e435..6e6d1732cfd 100644 --- a/src/main/java/org/tikv/common/importer/SwitchTiKVModeClient.java +++ b/src/main/java/org/tikv/common/importer/SwitchTiKVModeClient.java @@ -36,32 +36,36 @@ public class SwitchTiKVModeClient { private final PDClient pdClient; private final ImporterStoreClient.ImporterStoreClientBuilder builder; - private final ScheduledExecutorService ingestScheduledExecutorService; + private ScheduledExecutorService ingestScheduledExecutorService; public SwitchTiKVModeClient( PDClient pdClient, ImporterStoreClient.ImporterStoreClientBuilder builder) { this.pdClient = pdClient; this.builder = builder; - - this.ingestScheduledExecutorService = - Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryBuilder() - .setNameFormat("switch-tikv-mode-pool-%d") - .setDaemon(true) - .build()); } public void switchTiKVToNormalMode() { doSwitchTiKVMode(ImportSstpb.SwitchMode.Normal); } - public void keepTiKVToImportMode() { - ingestScheduledExecutorService.scheduleAtFixedRate( - this::switchTiKVToImportMode, 0, KEEP_TIKV_TO_IMPORT_MODE_PERIOD, TimeUnit.SECONDS); + public synchronized void keepTiKVToImportMode() { + if (ingestScheduledExecutorService == null) { + ingestScheduledExecutorService = + Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder() + .setNameFormat("switch-tikv-mode-pool-%d") + .setDaemon(true) + .build()); + ingestScheduledExecutorService.scheduleAtFixedRate( + this::switchTiKVToImportMode, 0, KEEP_TIKV_TO_IMPORT_MODE_PERIOD, TimeUnit.SECONDS); + } } - public void stopKeepTiKVToImportMode() { - ingestScheduledExecutorService.shutdown(); + public synchronized void stopKeepTiKVToImportMode() { + if (ingestScheduledExecutorService != null) { + ingestScheduledExecutorService.shutdown(); + ingestScheduledExecutorService = null; + } } private void switchTiKVToImportMode() { diff --git a/src/test/java/org/tikv/common/importer/SwitchTiKVModeTest.java b/src/test/java/org/tikv/common/importer/SwitchTiKVModeTest.java index ad5a8a69cbd..c0bbaf57604 100644 --- a/src/test/java/org/tikv/common/importer/SwitchTiKVModeTest.java +++ b/src/test/java/org/tikv/common/importer/SwitchTiKVModeTest.java @@ -26,9 +26,11 @@ public void tearDown() throws Exception { @Test public void switchTiKVModeTest() throws InterruptedException { SwitchTiKVModeClient switchTiKVModeClient = session.getSwitchTiKVModeClient(); - switchTiKVModeClient.keepTiKVToImportMode(); - Thread.sleep(6000); - switchTiKVModeClient.stopKeepTiKVToImportMode(); - switchTiKVModeClient.switchTiKVToNormalMode(); + for (int i = 0; i < 2; i++) { + switchTiKVModeClient.keepTiKVToImportMode(); + Thread.sleep(6000); + switchTiKVModeClient.stopKeepTiKVToImportMode(); + switchTiKVModeClient.switchTiKVToNormalMode(); + } } }