diff --git a/src/main/java/org/tikv/common/util/ChannelFactory.java b/src/main/java/org/tikv/common/util/ChannelFactory.java index 20840af7eb5..effda5cf467 100644 --- a/src/main/java/org/tikv/common/util/ChannelFactory.java +++ b/src/main/java/org/tikv/common/util/ChannelFactory.java @@ -64,7 +64,7 @@ public class ChannelFactory implements AutoCloseable { private final AtomicReference sslContextBuilder = new AtomicReference<>(); - private final ScheduledExecutorService recycler = Executors.newSingleThreadScheduledExecutor(); + private final ScheduledExecutorService recycler; private final ReadWriteLock lock = new ReentrantReadWriteLock(); @@ -209,6 +209,7 @@ public ChannelFactory( this.idleTimeout = idleTimeout; this.certWatcher = null; this.certContext = null; + this.recycler = null; this.connRecycleTime = 0; } @@ -229,6 +230,7 @@ public ChannelFactory( this.connRecycleTime = connRecycleTime; this.certContext = new OpenSslContext(trustCertCollectionFilePath, keyCertChainFilePath, keyFilePath); + this.recycler = Executors.newSingleThreadScheduledExecutor(); File trustCert = new File(trustCertCollectionFilePath); File keyCert = new File(keyCertChainFilePath); @@ -261,6 +263,7 @@ public ChannelFactory( this.idleTimeout = idleTimeout; this.connRecycleTime = connRecycleTime; this.certContext = new JksContext(jksKeyPath, jksKeyPassword, jksTrustPath, jksTrustPassword); + this.recycler = Executors.newSingleThreadScheduledExecutor(); File jksKey = new File(jksKeyPath); File jksTrust = new File(jksTrustPath); @@ -360,11 +363,12 @@ public void close() { } connPool.clear(); - if (certContext != null) { + if (recycler != null) { recycler.shutdown(); - if (certWatcher != null) { - certWatcher.close(); - } + } + + if (certWatcher != null) { + certWatcher.close(); } } } diff --git a/src/test/java/org/tikv/common/ChannelFactoryTest.java b/src/test/java/org/tikv/common/ChannelFactoryTest.java index ce071a69653..19155841598 100644 --- a/src/test/java/org/tikv/common/ChannelFactoryTest.java +++ b/src/test/java/org/tikv/common/ChannelFactoryTest.java @@ -55,24 +55,28 @@ public void testCertWatcher() throws InterruptedException { File a = new File(caPath); File b = new File(clientCertPath); File c = new File(clientKeyPath); - new CertWatcher(2, ImmutableList.of(a, b, c), () -> changed.set(true)); - Thread.sleep(5000); - assertTrue(changed.get()); + try (CertWatcher certWatcher = + new CertWatcher(2, ImmutableList.of(a, b, c), () -> changed.set(true))) { + Thread.sleep(5000); + assertTrue(changed.get()); + } } @Test public void testCertWatcherWithExceptionTask() throws InterruptedException { AtomicInteger timesOfReloadTask = new AtomicInteger(0); - new CertWatcher( - 1, - ImmutableList.of(new File(caPath), new File(clientCertPath), new File(clientKeyPath)), - () -> { - timesOfReloadTask.getAndIncrement(); - touchCert(); - throw new RuntimeException("Mock exception in reload task"); - }); + CertWatcher certWatcher = + new CertWatcher( + 1, + ImmutableList.of(new File(caPath), new File(clientCertPath), new File(clientKeyPath)), + () -> { + timesOfReloadTask.getAndIncrement(); + touchCert(); + throw new RuntimeException("Mock exception in reload task"); + }); Thread.sleep(5000); + certWatcher.close(); assertTrue(timesOfReloadTask.get() > 1); }