From 7c7cca6c111b27b602da2dd65c55351bdaa28576 Mon Sep 17 00:00:00 2001 From: Daemonxiao <35677990+Daemonxiao@users.noreply.github.com> Date: Wed, 22 Jun 2022 13:19:59 +0800 Subject: [PATCH] cherry pick #618 to release-3.3 Signed-off-by: ti-srebot --- .../org/tikv/common/util/ChannelFactory.java | 14 ++++++---- .../org/tikv/common/ChannelFactoryTest.java | 26 +++++++++++-------- 2 files changed, 24 insertions(+), 16 deletions(-) diff --git a/src/main/java/org/tikv/common/util/ChannelFactory.java b/src/main/java/org/tikv/common/util/ChannelFactory.java index 20840af7eb5..effda5cf467 100644 --- a/src/main/java/org/tikv/common/util/ChannelFactory.java +++ b/src/main/java/org/tikv/common/util/ChannelFactory.java @@ -64,7 +64,7 @@ public class ChannelFactory implements AutoCloseable { private final AtomicReference sslContextBuilder = new AtomicReference<>(); - private final ScheduledExecutorService recycler = Executors.newSingleThreadScheduledExecutor(); + private final ScheduledExecutorService recycler; private final ReadWriteLock lock = new ReentrantReadWriteLock(); @@ -209,6 +209,7 @@ public ChannelFactory( this.idleTimeout = idleTimeout; this.certWatcher = null; this.certContext = null; + this.recycler = null; this.connRecycleTime = 0; } @@ -229,6 +230,7 @@ public ChannelFactory( this.connRecycleTime = connRecycleTime; this.certContext = new OpenSslContext(trustCertCollectionFilePath, keyCertChainFilePath, keyFilePath); + this.recycler = Executors.newSingleThreadScheduledExecutor(); File trustCert = new File(trustCertCollectionFilePath); File keyCert = new File(keyCertChainFilePath); @@ -261,6 +263,7 @@ public ChannelFactory( this.idleTimeout = idleTimeout; this.connRecycleTime = connRecycleTime; this.certContext = new JksContext(jksKeyPath, jksKeyPassword, jksTrustPath, jksTrustPassword); + this.recycler = Executors.newSingleThreadScheduledExecutor(); File jksKey = new File(jksKeyPath); File jksTrust = new File(jksTrustPath); @@ -360,11 +363,12 @@ public void close() { } connPool.clear(); - if (certContext != null) { + if (recycler != null) { recycler.shutdown(); - if (certWatcher != null) { - certWatcher.close(); - } + } + + if (certWatcher != null) { + certWatcher.close(); } } } diff --git a/src/test/java/org/tikv/common/ChannelFactoryTest.java b/src/test/java/org/tikv/common/ChannelFactoryTest.java index ce071a69653..19155841598 100644 --- a/src/test/java/org/tikv/common/ChannelFactoryTest.java +++ b/src/test/java/org/tikv/common/ChannelFactoryTest.java @@ -55,24 +55,28 @@ public void testCertWatcher() throws InterruptedException { File a = new File(caPath); File b = new File(clientCertPath); File c = new File(clientKeyPath); - new CertWatcher(2, ImmutableList.of(a, b, c), () -> changed.set(true)); - Thread.sleep(5000); - assertTrue(changed.get()); + try (CertWatcher certWatcher = + new CertWatcher(2, ImmutableList.of(a, b, c), () -> changed.set(true))) { + Thread.sleep(5000); + assertTrue(changed.get()); + } } @Test public void testCertWatcherWithExceptionTask() throws InterruptedException { AtomicInteger timesOfReloadTask = new AtomicInteger(0); - new CertWatcher( - 1, - ImmutableList.of(new File(caPath), new File(clientCertPath), new File(clientKeyPath)), - () -> { - timesOfReloadTask.getAndIncrement(); - touchCert(); - throw new RuntimeException("Mock exception in reload task"); - }); + CertWatcher certWatcher = + new CertWatcher( + 1, + ImmutableList.of(new File(caPath), new File(clientCertPath), new File(clientKeyPath)), + () -> { + timesOfReloadTask.getAndIncrement(); + touchCert(); + throw new RuntimeException("Mock exception in reload task"); + }); Thread.sleep(5000); + certWatcher.close(); assertTrue(timesOfReloadTask.get() > 1); }