From 9ee0de82d882e3ad3987c816c27289b5e96f4bee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?caiconghui=20=5B=E8=94=A1=E8=81=AA=E8=BE=89=5D?= Date: Tue, 7 Jul 2020 21:59:23 +0800 Subject: [PATCH 1/9] [Thread Resource Leak]Fix thread resource leak after checkpoint catalog destroyed --- .../org/apache/doris/catalog/Catalog.java | 3 +- .../doris/common/ThreadPoolManager.java | 11 +++- .../common/publish/FixedTimePublisher.java | 2 +- .../org/apache/doris/load/ExportChecker.java | 7 ++- .../org/apache/doris/load/LoadChecker.java | 13 +++-- .../org/apache/doris/metric/MetricRepo.java | 2 +- .../org/apache/doris/qe/ConnectScheduler.java | 2 +- .../apache/doris/task/MasterTaskExecutor.java | 56 +++++++++---------- .../doris/task/MasterTaskExecutorTest.java | 3 +- 9 files changed, 57 insertions(+), 42 deletions(-) diff --git a/fe/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/src/main/java/org/apache/doris/catalog/Catalog.java index 0c0251bcdde292..368742be3085be 100755 --- a/fe/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/src/main/java/org/apache/doris/catalog/Catalog.java @@ -524,7 +524,7 @@ private Catalog() { this.tabletScheduler = new TabletScheduler(this, systemInfo, tabletInvertedIndex, stat); this.tabletChecker = new TabletChecker(this, systemInfo, tabletScheduler, stat); - this.loadTaskScheduler = new MasterTaskExecutor(Config.async_load_task_pool_size); + this.loadTaskScheduler = new MasterTaskExecutor("load_task_scheduler", Config.async_load_task_pool_size); this.loadJobScheduler = new LoadJobScheduler(); this.loadManager = new LoadManager(loadJobScheduler); this.loadTimeoutChecker = new LoadTimeoutChecker(loadManager); @@ -1247,6 +1247,7 @@ private void startMasterOnlyDaemonThreads() { LoadChecker.init(Config.load_checker_interval_second * 1000L); LoadChecker.startAll(); // New load scheduler + loadTaskScheduler.start(); loadManager.prepareJobs(); loadJobScheduler.start(); loadTimeoutChecker.start(); diff --git a/fe/src/main/java/org/apache/doris/common/ThreadPoolManager.java b/fe/src/main/java/org/apache/doris/common/ThreadPoolManager.java index c177cf7b1822a6..34deaf11e2de08 100644 --- a/fe/src/main/java/org/apache/doris/common/ThreadPoolManager.java +++ b/fe/src/main/java/org/apache/doris/common/ThreadPoolManager.java @@ -39,7 +39,7 @@ /** * ThreadPoolManager is a helper class for construct daemon thread pool with limit thread and memory resource. * thread names in thread pool are formatted as poolName-ID, where ID is a unique, sequentially assigned integer. - * it provide three functions to construct thread pool now. + * it provide for functions to construct thread pool now. * * 1. newDaemonCacheThreadPool * Wrapper over newCachedThreadPool with additional maxNumThread limit. @@ -47,6 +47,8 @@ * Wrapper over newCachedThreadPool with additional blocking queue capacity limit. * 3. newDaemonThreadPool * Wrapper over ThreadPoolExecutor, user can use it to construct thread pool more flexibly. + * 4. newDaemonScheduledThreadPool + * Wrapper over ScheduledThreadPoolExecutor, but without thread num limit now(NOTICE). * * All thread pool constructed by ThreadPoolManager will be added to the nameToThreadPoolMap, * so the thread pool name in fe must be unique. @@ -114,9 +116,12 @@ public static ThreadPoolExecutor newDaemonThreadPool(int corePoolSize, return threadPool; } - public static ScheduledThreadPoolExecutor newScheduledThreadPool(int maxNumThread, String poolName) { + // Now, we have no thread num limit in ScheduledThreadPoolExecutor, + // so it may cause oom when there are too many threads in ScheduledThreadPoolExecutor + // Please use this api with caution. + public static ScheduledThreadPoolExecutor newDaemonScheduledThreadPool(int corePoolSize, String poolName) { ThreadFactory threadFactory = namedThreadFactory(poolName); - ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(maxNumThread, threadFactory); + ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(corePoolSize, threadFactory); nameToThreadPoolMap.put(poolName, scheduledThreadPoolExecutor); return scheduledThreadPoolExecutor; } diff --git a/fe/src/main/java/org/apache/doris/common/publish/FixedTimePublisher.java b/fe/src/main/java/org/apache/doris/common/publish/FixedTimePublisher.java index 658068db5c3503..c631317531ec11 100644 --- a/fe/src/main/java/org/apache/doris/common/publish/FixedTimePublisher.java +++ b/fe/src/main/java/org/apache/doris/common/publish/FixedTimePublisher.java @@ -28,7 +28,7 @@ public class FixedTimePublisher { private static FixedTimePublisher INSTANCE; - private ScheduledThreadPoolExecutor scheduler = ThreadPoolManager.newScheduledThreadPool(1, "Fixed-Time-Publisher"); + private ScheduledThreadPoolExecutor scheduler = ThreadPoolManager.newDaemonScheduledThreadPool(1, "Fixed-Time-Publisher"); private ClusterStatePublisher publisher; public FixedTimePublisher(ClusterStatePublisher publisher) { diff --git a/fe/src/main/java/org/apache/doris/load/ExportChecker.java b/fe/src/main/java/org/apache/doris/load/ExportChecker.java index 414cfbd774d63e..ccd79942279c2e 100644 --- a/fe/src/main/java/org/apache/doris/load/ExportChecker.java +++ b/fe/src/main/java/org/apache/doris/load/ExportChecker.java @@ -53,10 +53,10 @@ public static void init(long intervalMs) { checkers.put(JobState.EXPORTING, new ExportChecker(JobState.EXPORTING, intervalMs)); int poolSize = Config.export_running_job_num_limit == 0 ? 5 : Config.export_running_job_num_limit; - MasterTaskExecutor pendingTaskExecutor = new MasterTaskExecutor(poolSize); + MasterTaskExecutor pendingTaskExecutor = new MasterTaskExecutor("export_pending_job", poolSize); executors.put(JobState.PENDING, pendingTaskExecutor); - MasterTaskExecutor exportingTaskExecutor = new MasterTaskExecutor(poolSize); + MasterTaskExecutor exportingTaskExecutor = new MasterTaskExecutor("export_exporting_job", poolSize); executors.put(JobState.EXPORTING, exportingTaskExecutor); } @@ -64,6 +64,9 @@ public static void startAll() { for (ExportChecker exportChecker : checkers.values()) { exportChecker.start(); } + for (MasterTaskExecutor masterTaskExecutor : executors.values()) { + masterTaskExecutor.start(); + } } @Override diff --git a/fe/src/main/java/org/apache/doris/load/LoadChecker.java b/fe/src/main/java/org/apache/doris/load/LoadChecker.java index d6e0cb7d862f7a..631d672df54e26 100644 --- a/fe/src/main/java/org/apache/doris/load/LoadChecker.java +++ b/fe/src/main/java/org/apache/doris/load/LoadChecker.java @@ -92,14 +92,14 @@ public static void init(long intervalMs) { Map pendingPriorityMap = Maps.newHashMap(); pendingPriorityMap.put(TPriority.NORMAL, - new MasterTaskExecutor(Config.load_pending_thread_num_normal_priority)); + new MasterTaskExecutor("load_pending_thread_num_normal_priority", Config.load_pending_thread_num_normal_priority)); pendingPriorityMap.put(TPriority.HIGH, - new MasterTaskExecutor(Config.load_pending_thread_num_high_priority)); + new MasterTaskExecutor("load_pending_thread_num_high_priority", Config.load_pending_thread_num_high_priority)); executors.put(JobState.PENDING, pendingPriorityMap); Map etlPriorityMap = Maps.newHashMap(); - etlPriorityMap.put(TPriority.NORMAL, new MasterTaskExecutor(Config.load_etl_thread_num_normal_priority)); - etlPriorityMap.put(TPriority.HIGH, new MasterTaskExecutor(Config.load_etl_thread_num_high_priority)); + etlPriorityMap.put(TPriority.NORMAL, new MasterTaskExecutor("load_etl_thread_num_normal_priority", Config.load_etl_thread_num_normal_priority)); + etlPriorityMap.put(TPriority.HIGH, new MasterTaskExecutor("load_etl_thread_num_high_priority", Config.load_etl_thread_num_high_priority)); executors.put(JobState.ETL, etlPriorityMap); } @@ -110,6 +110,11 @@ public static void startAll() { for (LoadChecker loadChecker : checkers.values()) { loadChecker.start(); } + for (Map map : executors.values()) { + for (MasterTaskExecutor masterTaskExecutor : map.values()) { + masterTaskExecutor.start(); + } + } } @Override diff --git a/fe/src/main/java/org/apache/doris/metric/MetricRepo.java b/fe/src/main/java/org/apache/doris/metric/MetricRepo.java index 8d4139948268c8..029f5e6aa6156b 100644 --- a/fe/src/main/java/org/apache/doris/metric/MetricRepo.java +++ b/fe/src/main/java/org/apache/doris/metric/MetricRepo.java @@ -86,7 +86,7 @@ public final class MetricRepo { public static GaugeMetricImpl GAUGE_QUERY_ERR_RATE; public static GaugeMetricImpl GAUGE_MAX_TABLET_COMPACTION_SCORE; - private static ScheduledThreadPoolExecutor metricTimer = ThreadPoolManager.newScheduledThreadPool(1, "Metric-Timer-Pool"); + private static ScheduledThreadPoolExecutor metricTimer = ThreadPoolManager.newDaemonScheduledThreadPool(1, "Metric-Timer-Pool"); private static MetricCalculator metricCalculator = new MetricCalculator(); public static synchronized void init() { diff --git a/fe/src/main/java/org/apache/doris/qe/ConnectScheduler.java b/fe/src/main/java/org/apache/doris/qe/ConnectScheduler.java index 6f4ec55c21782f..8c2e8af67ae16a 100644 --- a/fe/src/main/java/org/apache/doris/qe/ConnectScheduler.java +++ b/fe/src/main/java/org/apache/doris/qe/ConnectScheduler.java @@ -54,7 +54,7 @@ public class ConnectScheduler { // 1. If use a scheduler, the task maybe a huge number when query is messy. // Let timeout is 10m, and 5000 qps, then there are up to 3000000 tasks in scheduler. // 2. Use a thread to poll maybe lose some accurate, but is enough to us. - private ScheduledExecutorService checkTimer = ThreadPoolManager.newScheduledThreadPool(1, + private ScheduledExecutorService checkTimer = ThreadPoolManager.newDaemonScheduledThreadPool(1, "Connect-Scheduler-Check-Timer"); public ConnectScheduler(int maxConnections) { diff --git a/fe/src/main/java/org/apache/doris/task/MasterTaskExecutor.java b/fe/src/main/java/org/apache/doris/task/MasterTaskExecutor.java index 90aad6b5c949e5..3e5361aaa638e3 100644 --- a/fe/src/main/java/org/apache/doris/task/MasterTaskExecutor.java +++ b/fe/src/main/java/org/apache/doris/task/MasterTaskExecutor.java @@ -20,6 +20,7 @@ import com.google.common.collect.Maps; import org.apache.doris.common.ThreadPoolManager; +import org.apache.doris.common.util.Daemon; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -27,21 +28,40 @@ import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.ThreadPoolExecutor; -public class MasterTaskExecutor { + +public class MasterTaskExecutor extends Daemon { private static final Logger LOG = LogManager.getLogger(MasterTaskExecutor.class); - private ScheduledExecutorService executor; + private ThreadPoolExecutor executor; private Map> runningTasks; - public MasterTaskExecutor(int threadNum) { - executor = ThreadPoolManager.newScheduledThreadPool(threadNum, "Master-Task-Executor-Pool"); + public MasterTaskExecutor(String name, int threadNum) { + super(name, 1000L); + executor = ThreadPoolManager.newDaemonFixedThreadPool(threadNum, threadNum * 2, name + "_pool"); runningTasks = Maps.newHashMap(); - executor.scheduleAtFixedRate(new TaskChecker(), 0L, 1000L, TimeUnit.MILLISECONDS); } - + + @Override + protected void runOneCycle() { + try { + synchronized (runningTasks) { + Iterator>> iterator = runningTasks.entrySet().iterator(); + while (iterator.hasNext()) { + Entry> entry = iterator.next(); + Future future = entry.getValue(); + if (future.isDone()) { + iterator.remove(); + } + } + } + } catch (Exception e) { + LOG.error("check task error", e); + } + } + + /** * submit task to task executor * @param task @@ -70,24 +90,4 @@ public int getTaskNum() { return runningTasks.size(); } } - - private class TaskChecker implements Runnable { - @Override - public void run() { - try { - synchronized (runningTasks) { - Iterator>> iterator = runningTasks.entrySet().iterator(); - while (iterator.hasNext()) { - Entry> entry = iterator.next(); - Future future = entry.getValue(); - if (future.isDone()) { - iterator.remove(); - } - } - } - } catch (Exception e) { - LOG.error("check task error", e); - } - } - } } diff --git a/fe/src/test/java/org/apache/doris/task/MasterTaskExecutorTest.java b/fe/src/test/java/org/apache/doris/task/MasterTaskExecutorTest.java index 8bc68158100bcf..3b2471a68ba2eb 100644 --- a/fe/src/test/java/org/apache/doris/task/MasterTaskExecutorTest.java +++ b/fe/src/test/java/org/apache/doris/task/MasterTaskExecutorTest.java @@ -33,7 +33,8 @@ public class MasterTaskExecutorTest { @Before public void setUp() { - executor = new MasterTaskExecutor(THREAD_NUM); + executor = new MasterTaskExecutor("master_task_executor_test", THREAD_NUM); + executor.start(); } @After From 90028eee951efceabd603e1c7e0b6737cc71506b Mon Sep 17 00:00:00 2001 From: caiconghui Date: Wed, 8 Jul 2020 00:14:54 +0800 Subject: [PATCH 2/9] refactor MasterTaskExecutor.java --- .../apache/doris/task/MasterTaskExecutor.java | 47 +++++++++++-------- 1 file changed, 27 insertions(+), 20 deletions(-) diff --git a/fe/src/main/java/org/apache/doris/task/MasterTaskExecutor.java b/fe/src/main/java/org/apache/doris/task/MasterTaskExecutor.java index 3e5361aaa638e3..810c9cce27da58 100644 --- a/fe/src/main/java/org/apache/doris/task/MasterTaskExecutor.java +++ b/fe/src/main/java/org/apache/doris/task/MasterTaskExecutor.java @@ -20,7 +20,6 @@ import com.google.common.collect.Maps; import org.apache.doris.common.ThreadPoolManager; -import org.apache.doris.common.util.Daemon; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -28,40 +27,28 @@ import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.Future; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; -public class MasterTaskExecutor extends Daemon { +public class MasterTaskExecutor { private static final Logger LOG = LogManager.getLogger(MasterTaskExecutor.class); private ThreadPoolExecutor executor; private Map> runningTasks; + public ScheduledThreadPoolExecutor scheduledThreadPool; public MasterTaskExecutor(String name, int threadNum) { - super(name, 1000L); executor = ThreadPoolManager.newDaemonFixedThreadPool(threadNum, threadNum * 2, name + "_pool"); runningTasks = Maps.newHashMap(); + scheduledThreadPool = ThreadPoolManager.newDaemonScheduledThreadPool(1, name + "_scheduler_thread_pool"); } - @Override - protected void runOneCycle() { - try { - synchronized (runningTasks) { - Iterator>> iterator = runningTasks.entrySet().iterator(); - while (iterator.hasNext()) { - Entry> entry = iterator.next(); - Future future = entry.getValue(); - if (future.isDone()) { - iterator.remove(); - } - } - } - } catch (Exception e) { - LOG.error("check task error", e); - } + public void start() { + scheduledThreadPool.scheduleAtFixedRate(new TaskChecker(), 0L, 1000L, TimeUnit.MILLISECONDS); } - /** * submit task to task executor * @param task @@ -90,4 +77,24 @@ public int getTaskNum() { return runningTasks.size(); } } + + private class TaskChecker implements Runnable { + @Override + public void run() { + try { + synchronized (runningTasks) { + Iterator>> iterator = runningTasks.entrySet().iterator(); + while (iterator.hasNext()) { + Entry> entry = iterator.next(); + Future future = entry.getValue(); + if (future.isDone()) { + iterator.remove(); + } + } + } + } catch (Exception e) { + LOG.error("check task error", e); + } + } + } } From 4c5fa0ee14f11ddc31e06c2b70c142122cb05edd Mon Sep 17 00:00:00 2001 From: caiconghui Date: Wed, 8 Jul 2020 00:36:44 +0800 Subject: [PATCH 3/9] fix typo --- fe/src/main/java/org/apache/doris/common/ThreadPoolManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fe/src/main/java/org/apache/doris/common/ThreadPoolManager.java b/fe/src/main/java/org/apache/doris/common/ThreadPoolManager.java index 34deaf11e2de08..5ae0a925c8a9af 100644 --- a/fe/src/main/java/org/apache/doris/common/ThreadPoolManager.java +++ b/fe/src/main/java/org/apache/doris/common/ThreadPoolManager.java @@ -39,7 +39,7 @@ /** * ThreadPoolManager is a helper class for construct daemon thread pool with limit thread and memory resource. * thread names in thread pool are formatted as poolName-ID, where ID is a unique, sequentially assigned integer. - * it provide for functions to construct thread pool now. + * it provide four functions to construct thread pool now. * * 1. newDaemonCacheThreadPool * Wrapper over newCachedThreadPool with additional maxNumThread limit. From 3cdc6347276c7412d4645e8928c98aa22055a234 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?caiconghui=20=5B=E8=94=A1=E8=81=AA=E8=BE=89=5D?= Date: Wed, 8 Jul 2020 10:45:27 +0800 Subject: [PATCH 4/9] Remove unused file FixedTimePublisher.java --- .../common/publish/FixedTimePublisher.java | 72 ------------------- 1 file changed, 72 deletions(-) delete mode 100644 fe/src/main/java/org/apache/doris/common/publish/FixedTimePublisher.java diff --git a/fe/src/main/java/org/apache/doris/common/publish/FixedTimePublisher.java b/fe/src/main/java/org/apache/doris/common/publish/FixedTimePublisher.java deleted file mode 100644 index c631317531ec11..00000000000000 --- a/fe/src/main/java/org/apache/doris/common/publish/FixedTimePublisher.java +++ /dev/null @@ -1,72 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.common.publish; - -import org.apache.doris.common.Config; -import org.apache.doris.common.ThreadPoolManager; - -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -// Fixed time scheduled publisher. -// You can register your routine publish here. -public class FixedTimePublisher { - private static FixedTimePublisher INSTANCE; - - private ScheduledThreadPoolExecutor scheduler = ThreadPoolManager.newDaemonScheduledThreadPool(1, "Fixed-Time-Publisher"); - private ClusterStatePublisher publisher; - - public FixedTimePublisher(ClusterStatePublisher publisher) { - this.publisher = publisher; - } - - public static FixedTimePublisher getInstance() { - if (INSTANCE == null) { - INSTANCE = new FixedTimePublisher(ClusterStatePublisher.getInstance()); - } - return INSTANCE; - } - - public void register(Callback callback, long intervalMs) { - scheduler.scheduleAtFixedRate(new Worker(callback), 0, intervalMs, TimeUnit.MILLISECONDS); - } - - private class Worker implements Runnable { - private Callback callback; - - public Worker(Callback callback) { - this.callback = callback; - } - - @Override - public void run() { - ClusterStateUpdate.Builder builder = ClusterStateUpdate.builder(); - builder.addUpdate(callback.getTopicUpdate()); - ClusterStateUpdate state = builder.build(); - Listener listener = Listeners.nullToNoOpListener(callback.getListener()); - - publisher.publish(state, listener, Config.meta_publish_timeout_ms); - } - } - - public static interface Callback { - public TopicUpdate getTopicUpdate(); - - public Listener getListener(); - } -} From 57e2bda4c3b1db78a7732ef98ae943dd545e8153 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?caiconghui=20=5B=E8=94=A1=E8=81=AA=E8=BE=89=5D?= Date: Wed, 8 Jul 2020 11:07:16 +0800 Subject: [PATCH 5/9] fix comment --- .../java/org/apache/doris/common/ThreadPoolManager.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/fe/src/main/java/org/apache/doris/common/ThreadPoolManager.java b/fe/src/main/java/org/apache/doris/common/ThreadPoolManager.java index 5ae0a925c8a9af..61ed3d2b530855 100644 --- a/fe/src/main/java/org/apache/doris/common/ThreadPoolManager.java +++ b/fe/src/main/java/org/apache/doris/common/ThreadPoolManager.java @@ -48,7 +48,7 @@ * 3. newDaemonThreadPool * Wrapper over ThreadPoolExecutor, user can use it to construct thread pool more flexibly. * 4. newDaemonScheduledThreadPool - * Wrapper over ScheduledThreadPoolExecutor, but without thread num limit now(NOTICE). + * Wrapper over ScheduledThreadPoolExecutor, but without delay task num limit and thread num limit now(NOTICE). * * All thread pool constructed by ThreadPoolManager will be added to the nameToThreadPoolMap, * so the thread pool name in fe must be unique. @@ -116,9 +116,9 @@ public static ThreadPoolExecutor newDaemonThreadPool(int corePoolSize, return threadPool; } - // Now, we have no thread num limit in ScheduledThreadPoolExecutor, - // so it may cause oom when there are too many threads in ScheduledThreadPoolExecutor - // Please use this api with caution. + // Now, we have no delay task num limit and thread num limit in ScheduledThreadPoolExecutor, + // so it may cause oom when there are too many delay tasks or threads in ScheduledThreadPoolExecutor + // Please use this api only for scheduling short task at fix rate. public static ScheduledThreadPoolExecutor newDaemonScheduledThreadPool(int corePoolSize, String poolName) { ThreadFactory threadFactory = namedThreadFactory(poolName); ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(corePoolSize, threadFactory); From f0fbdc5710b5d2583c5d80a03f74b3cc3b1b698b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?caiconghui=20=5B=E8=94=A1=E8=81=AA=E8=BE=89=5D?= Date: Wed, 8 Jul 2020 13:41:15 +0800 Subject: [PATCH 6/9] Fix that thread pool in checkpoint catalog doesn't need register metric --- .../org/apache/doris/catalog/Catalog.java | 15 +++++++----- .../doris/common/ThreadPoolManager.java | 23 ++++++++++++------- .../org/apache/doris/common/ThriftServer.java | 4 ++-- .../common/publish/ClusterStatePublisher.java | 2 +- .../org/apache/doris/load/ExportChecker.java | 4 ++-- .../org/apache/doris/load/LoadChecker.java | 8 +++---- .../org/apache/doris/metric/MetricRepo.java | 2 +- .../org/apache/doris/mysql/MysqlServer.java | 2 +- .../apache/doris/mysql/nio/NMysqlServer.java | 2 +- .../org/apache/doris/qe/ConnectScheduler.java | 4 ++-- .../org/apache/doris/system/HeartbeatMgr.java | 4 ++-- .../apache/doris/task/AgentTaskExecutor.java | 2 +- .../apache/doris/task/MasterTaskExecutor.java | 6 ++--- .../org/apache/doris/task/PullLoadJobMgr.java | 4 ++-- .../doris/common/ThreadPoolManagerTest.java | 4 ++-- .../routineload/RoutineLoadSchedulerTest.java | 2 +- .../doris/task/MasterTaskExecutorTest.java | 2 +- 17 files changed, 50 insertions(+), 40 deletions(-) diff --git a/fe/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/src/main/java/org/apache/doris/catalog/Catalog.java index 368742be3085be..e0a838c721b454 100755 --- a/fe/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/src/main/java/org/apache/doris/catalog/Catalog.java @@ -111,6 +111,7 @@ import org.apache.doris.common.MarkedCountDownLatch; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.Pair; +import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.common.UserException; import org.apache.doris.common.io.Text; import org.apache.doris.common.util.Daemon; @@ -458,10 +459,10 @@ public DynamicPartitionScheduler getDynamicPartitionScheduler() { } private static class SingletonHolder { - private static final Catalog INSTANCE = new Catalog(); + private static final Catalog INSTANCE = new Catalog(false); } - private Catalog() { + private Catalog(boolean isCheckpointCatalog) { this.idToDb = new ConcurrentHashMap<>(); this.fullNameToDb = new ConcurrentHashMap<>(); this.load = new Load(); @@ -491,7 +492,7 @@ private Catalog() { this.masterIp = ""; this.systemInfo = new SystemInfoService(); - this.heartbeatMgr = new HeartbeatMgr(systemInfo); + this.heartbeatMgr = new HeartbeatMgr(systemInfo, !isCheckpointCatalog); this.tabletInvertedIndex = new TabletInvertedIndex(); this.colocateTableIndex = new ColocateTableIndex(); this.recycleBin = new CatalogRecycleBin(); @@ -505,7 +506,7 @@ private Catalog() { this.isDefaultClusterCreated = false; - this.pullLoadJobMgr = new PullLoadJobMgr(); + this.pullLoadJobMgr = new PullLoadJobMgr(!isCheckpointCatalog); this.brokerMgr = new BrokerMgr(); this.resourceMgr = new ResourceMgr(); @@ -524,7 +525,7 @@ private Catalog() { this.tabletScheduler = new TabletScheduler(this, systemInfo, tabletInvertedIndex, stat); this.tabletChecker = new TabletChecker(this, systemInfo, tabletScheduler, stat); - this.loadTaskScheduler = new MasterTaskExecutor("load_task_scheduler", Config.async_load_task_pool_size); + this.loadTaskScheduler = new MasterTaskExecutor("load_task_scheduler", Config.async_load_task_pool_size, !isCheckpointCatalog); this.loadJobScheduler = new LoadJobScheduler(); this.loadManager = new LoadManager(loadJobScheduler); this.loadTimeoutChecker = new LoadTimeoutChecker(loadManager); @@ -557,7 +558,7 @@ public static Catalog getCurrentCatalog() { // only checkpoint thread it self will goes here. // so no need to care about the thread safe. if (CHECKPOINT == null) { - CHECKPOINT = new Catalog(); + CHECKPOINT = new Catalog(true); } return CHECKPOINT; } else { @@ -1208,6 +1209,8 @@ private void transferToMaster() { String msg = "master finished to replay journal, can write now."; Util.stdoutWithTime(msg); LOG.info(msg); + // for master, there are some new thread pools need to register metric + ThreadPoolManager.registerAllThreadPoolMetric(); } /* diff --git a/fe/src/main/java/org/apache/doris/common/ThreadPoolManager.java b/fe/src/main/java/org/apache/doris/common/ThreadPoolManager.java index 61ed3d2b530855..ca923a9e713648 100644 --- a/fe/src/main/java/org/apache/doris/common/ThreadPoolManager.java +++ b/fe/src/main/java/org/apache/doris/common/ThreadPoolManager.java @@ -68,6 +68,7 @@ public static void registerAllThreadPoolMetric() { for (Map.Entry entry : nameToThreadPoolMap.entrySet()) { registerThreadPoolMetric(entry.getKey(), entry.getValue()); } + nameToThreadPoolMap.clear(); } public static void registerThreadPoolMetric(String poolName, ThreadPoolExecutor threadPool) { @@ -94,13 +95,14 @@ public Integer getValue() { } } - public static ThreadPoolExecutor newDaemonCacheThreadPool(int maxNumThread, String poolName) { - return newDaemonThreadPool(0, maxNumThread, KEEP_ALIVE_TIME, TimeUnit.SECONDS, new SynchronousQueue(), new LogDiscardPolicy(poolName), poolName); + public static ThreadPoolExecutor newDaemonCacheThreadPool(int maxNumThread, String poolName, boolean needRegisterMetric) { + return newDaemonThreadPool(0, maxNumThread, KEEP_ALIVE_TIME, TimeUnit.SECONDS, new SynchronousQueue(), + new LogDiscardPolicy(poolName), poolName, needRegisterMetric); } - public static ThreadPoolExecutor newDaemonFixedThreadPool(int numThread, int queueSize, String poolName) { + public static ThreadPoolExecutor newDaemonFixedThreadPool(int numThread, int queueSize, String poolName, boolean needRegisterMetric) { return newDaemonThreadPool(numThread, numThread, KEEP_ALIVE_TIME ,TimeUnit.SECONDS, new LinkedBlockingQueue<>(queueSize), - new BlockedPolicy(poolName, 60), poolName); + new BlockedPolicy(poolName, 60), poolName, needRegisterMetric); } public static ThreadPoolExecutor newDaemonThreadPool(int corePoolSize, @@ -109,20 +111,25 @@ public static ThreadPoolExecutor newDaemonThreadPool(int corePoolSize, TimeUnit unit, BlockingQueue workQueue, RejectedExecutionHandler handler, - String poolName) { + String poolName, + boolean needRegisterMetric) { ThreadFactory threadFactory = namedThreadFactory(poolName); ThreadPoolExecutor threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); - nameToThreadPoolMap.put(poolName, threadPool); + if (needRegisterMetric) { + nameToThreadPoolMap.put(poolName, threadPool); + } return threadPool; } // Now, we have no delay task num limit and thread num limit in ScheduledThreadPoolExecutor, // so it may cause oom when there are too many delay tasks or threads in ScheduledThreadPoolExecutor // Please use this api only for scheduling short task at fix rate. - public static ScheduledThreadPoolExecutor newDaemonScheduledThreadPool(int corePoolSize, String poolName) { + public static ScheduledThreadPoolExecutor newDaemonScheduledThreadPool(int corePoolSize, String poolName, boolean needRegisterMetric) { ThreadFactory threadFactory = namedThreadFactory(poolName); ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(corePoolSize, threadFactory); - nameToThreadPoolMap.put(poolName, scheduledThreadPoolExecutor); + if (needRegisterMetric) { + nameToThreadPoolMap.put(poolName, scheduledThreadPoolExecutor); + } return scheduledThreadPoolExecutor; } diff --git a/fe/src/main/java/org/apache/doris/common/ThriftServer.java b/fe/src/main/java/org/apache/doris/common/ThriftServer.java index 3c293dd1fa2704..14eeaebc301b11 100644 --- a/fe/src/main/java/org/apache/doris/common/ThriftServer.java +++ b/fe/src/main/java/org/apache/doris/common/ThriftServer.java @@ -100,7 +100,7 @@ private void createThreadedServer() throws TTransportException { TThreadedSelectorServer.Args args = new TThreadedSelectorServer.Args(new TNonblockingServerSocket(port)).protocolFactory( new TBinaryProtocol.Factory()).processor(processor); - ThreadPoolExecutor threadPoolExecutor = ThreadPoolManager.newDaemonCacheThreadPool(Config.thrift_server_max_worker_threads, "thrift-server-pool"); + ThreadPoolExecutor threadPoolExecutor = ThreadPoolManager.newDaemonCacheThreadPool(Config.thrift_server_max_worker_threads, "thrift-server-pool", true); args.executorService(threadPoolExecutor); server = new TThreadedSelectorServer(args); } @@ -114,7 +114,7 @@ private void createThreadPoolServer() throws TTransportException { TThreadPoolServer.Args serverArgs = new TThreadPoolServer.Args(new TServerSocket(socketTransportArgs)).protocolFactory( new TBinaryProtocol.Factory()).processor(processor); - ThreadPoolExecutor threadPoolExecutor = ThreadPoolManager.newDaemonCacheThreadPool(Config.thrift_server_max_worker_threads, "thrift-server-pool"); + ThreadPoolExecutor threadPoolExecutor = ThreadPoolManager.newDaemonCacheThreadPool(Config.thrift_server_max_worker_threads, "thrift-server-pool", true); serverArgs.executorService(threadPoolExecutor); server = new TThreadPoolServer(serverArgs); } diff --git a/fe/src/main/java/org/apache/doris/common/publish/ClusterStatePublisher.java b/fe/src/main/java/org/apache/doris/common/publish/ClusterStatePublisher.java index f96e4b7d0f99a5..3a7cae77256b5c 100644 --- a/fe/src/main/java/org/apache/doris/common/publish/ClusterStatePublisher.java +++ b/fe/src/main/java/org/apache/doris/common/publish/ClusterStatePublisher.java @@ -41,7 +41,7 @@ public class ClusterStatePublisher { private static final Logger LOG = LogManager.getLogger(ClusterStatePublisher.class); private static ClusterStatePublisher INSTANCE; - private ExecutorService executor = ThreadPoolManager.newDaemonFixedThreadPool(5, 256, "cluster-state-publisher"); + private ExecutorService executor = ThreadPoolManager.newDaemonFixedThreadPool(5, 256, "cluster-state-publisher", true); private SystemInfoService clusterInfoService; diff --git a/fe/src/main/java/org/apache/doris/load/ExportChecker.java b/fe/src/main/java/org/apache/doris/load/ExportChecker.java index ccd79942279c2e..eccd6091d5c670 100644 --- a/fe/src/main/java/org/apache/doris/load/ExportChecker.java +++ b/fe/src/main/java/org/apache/doris/load/ExportChecker.java @@ -53,10 +53,10 @@ public static void init(long intervalMs) { checkers.put(JobState.EXPORTING, new ExportChecker(JobState.EXPORTING, intervalMs)); int poolSize = Config.export_running_job_num_limit == 0 ? 5 : Config.export_running_job_num_limit; - MasterTaskExecutor pendingTaskExecutor = new MasterTaskExecutor("export_pending_job", poolSize); + MasterTaskExecutor pendingTaskExecutor = new MasterTaskExecutor("export_pending_job", poolSize, true); executors.put(JobState.PENDING, pendingTaskExecutor); - MasterTaskExecutor exportingTaskExecutor = new MasterTaskExecutor("export_exporting_job", poolSize); + MasterTaskExecutor exportingTaskExecutor = new MasterTaskExecutor("export_exporting_job", poolSize, true); executors.put(JobState.EXPORTING, exportingTaskExecutor); } diff --git a/fe/src/main/java/org/apache/doris/load/LoadChecker.java b/fe/src/main/java/org/apache/doris/load/LoadChecker.java index 631d672df54e26..20620b5f0d4629 100644 --- a/fe/src/main/java/org/apache/doris/load/LoadChecker.java +++ b/fe/src/main/java/org/apache/doris/load/LoadChecker.java @@ -92,14 +92,14 @@ public static void init(long intervalMs) { Map pendingPriorityMap = Maps.newHashMap(); pendingPriorityMap.put(TPriority.NORMAL, - new MasterTaskExecutor("load_pending_thread_num_normal_priority", Config.load_pending_thread_num_normal_priority)); + new MasterTaskExecutor("load_pending_thread_num_normal_priority", Config.load_pending_thread_num_normal_priority, true)); pendingPriorityMap.put(TPriority.HIGH, - new MasterTaskExecutor("load_pending_thread_num_high_priority", Config.load_pending_thread_num_high_priority)); + new MasterTaskExecutor("load_pending_thread_num_high_priority", Config.load_pending_thread_num_high_priority, true)); executors.put(JobState.PENDING, pendingPriorityMap); Map etlPriorityMap = Maps.newHashMap(); - etlPriorityMap.put(TPriority.NORMAL, new MasterTaskExecutor("load_etl_thread_num_normal_priority", Config.load_etl_thread_num_normal_priority)); - etlPriorityMap.put(TPriority.HIGH, new MasterTaskExecutor("load_etl_thread_num_high_priority", Config.load_etl_thread_num_high_priority)); + etlPriorityMap.put(TPriority.NORMAL, new MasterTaskExecutor("load_etl_thread_num_normal_priority", Config.load_etl_thread_num_normal_priority, true)); + etlPriorityMap.put(TPriority.HIGH, new MasterTaskExecutor("load_etl_thread_num_high_priority", Config.load_etl_thread_num_high_priority, true)); executors.put(JobState.ETL, etlPriorityMap); } diff --git a/fe/src/main/java/org/apache/doris/metric/MetricRepo.java b/fe/src/main/java/org/apache/doris/metric/MetricRepo.java index 029f5e6aa6156b..4e5e4e112cfb14 100644 --- a/fe/src/main/java/org/apache/doris/metric/MetricRepo.java +++ b/fe/src/main/java/org/apache/doris/metric/MetricRepo.java @@ -86,7 +86,7 @@ public final class MetricRepo { public static GaugeMetricImpl GAUGE_QUERY_ERR_RATE; public static GaugeMetricImpl GAUGE_MAX_TABLET_COMPACTION_SCORE; - private static ScheduledThreadPoolExecutor metricTimer = ThreadPoolManager.newDaemonScheduledThreadPool(1, "Metric-Timer-Pool"); + private static ScheduledThreadPoolExecutor metricTimer = ThreadPoolManager.newDaemonScheduledThreadPool(1, "Metric-Timer-Pool", true); private static MetricCalculator metricCalculator = new MetricCalculator(); public static synchronized void init() { diff --git a/fe/src/main/java/org/apache/doris/mysql/MysqlServer.java b/fe/src/main/java/org/apache/doris/mysql/MysqlServer.java index 1cd099bb672e7f..ed1d599978668d 100644 --- a/fe/src/main/java/org/apache/doris/mysql/MysqlServer.java +++ b/fe/src/main/java/org/apache/doris/mysql/MysqlServer.java @@ -70,7 +70,7 @@ public boolean start() { } // start accept thread - listener = ThreadPoolManager.newDaemonCacheThreadPool(1, "MySQL-Protocol-Listener"); + listener = ThreadPoolManager.newDaemonCacheThreadPool(1, "MySQL-Protocol-Listener", true); running = true; listenerFuture = listener.submit(new Listener()); diff --git a/fe/src/main/java/org/apache/doris/mysql/nio/NMysqlServer.java b/fe/src/main/java/org/apache/doris/mysql/nio/NMysqlServer.java index c0ed55f7513b9a..964abaa045f9a4 100644 --- a/fe/src/main/java/org/apache/doris/mysql/nio/NMysqlServer.java +++ b/fe/src/main/java/org/apache/doris/mysql/nio/NMysqlServer.java @@ -46,7 +46,7 @@ public class NMysqlServer extends MysqlServer { private AcceptingChannel server; // default task service. - private ExecutorService taskService = ThreadPoolManager.newDaemonCacheThreadPool(Config.max_mysql_service_task_threads_num, "doris-mysql-nio-pool"); + private ExecutorService taskService = ThreadPoolManager.newDaemonCacheThreadPool(Config.max_mysql_service_task_threads_num, "doris-mysql-nio-pool", true); public NMysqlServer(int port, ConnectScheduler connectScheduler) { this.port = port; diff --git a/fe/src/main/java/org/apache/doris/qe/ConnectScheduler.java b/fe/src/main/java/org/apache/doris/qe/ConnectScheduler.java index 8c2e8af67ae16a..5d431c476ba443 100644 --- a/fe/src/main/java/org/apache/doris/qe/ConnectScheduler.java +++ b/fe/src/main/java/org/apache/doris/qe/ConnectScheduler.java @@ -48,14 +48,14 @@ public class ConnectScheduler { private AtomicInteger nextConnectionId; private Map connectionMap = Maps.newHashMap(); private Map connByUser = Maps.newHashMap(); - private ExecutorService executor = ThreadPoolManager.newDaemonCacheThreadPool(Config.max_connection_scheduler_threads_num, "connect-scheduler-pool"); + private ExecutorService executor = ThreadPoolManager.newDaemonCacheThreadPool(Config.max_connection_scheduler_threads_num, "connect-scheduler-pool", true); // Use a thread to check whether connection is timeout. Because // 1. If use a scheduler, the task maybe a huge number when query is messy. // Let timeout is 10m, and 5000 qps, then there are up to 3000000 tasks in scheduler. // 2. Use a thread to poll maybe lose some accurate, but is enough to us. private ScheduledExecutorService checkTimer = ThreadPoolManager.newDaemonScheduledThreadPool(1, - "Connect-Scheduler-Check-Timer"); + "Connect-Scheduler-Check-Timer", true); public ConnectScheduler(int maxConnections) { this.maxConnections = maxConnections; diff --git a/fe/src/main/java/org/apache/doris/system/HeartbeatMgr.java b/fe/src/main/java/org/apache/doris/system/HeartbeatMgr.java index 6f391b419e2665..7948c6371254cd 100644 --- a/fe/src/main/java/org/apache/doris/system/HeartbeatMgr.java +++ b/fe/src/main/java/org/apache/doris/system/HeartbeatMgr.java @@ -70,11 +70,11 @@ public class HeartbeatMgr extends MasterDaemon { private static volatile AtomicReference masterInfo = new AtomicReference<>(); - public HeartbeatMgr(SystemInfoService nodeMgr) { + public HeartbeatMgr(SystemInfoService nodeMgr, boolean needRegisterMetric) { super("heartbeat mgr", FeConstants.heartbeat_interval_second * 1000); this.nodeMgr = nodeMgr; this.executor = ThreadPoolManager.newDaemonFixedThreadPool(Config.heartbeat_mgr_threads_num, - Config.heartbeat_mgr_blocking_queue_size, "heartbeat-mgr-pool"); + Config.heartbeat_mgr_blocking_queue_size, "heartbeat-mgr-pool", needRegisterMetric); this.heartbeatFlags = new HeartbeatFlags(); } diff --git a/fe/src/main/java/org/apache/doris/task/AgentTaskExecutor.java b/fe/src/main/java/org/apache/doris/task/AgentTaskExecutor.java index 4ebd8d44244e79..a206bb309a3fee 100644 --- a/fe/src/main/java/org/apache/doris/task/AgentTaskExecutor.java +++ b/fe/src/main/java/org/apache/doris/task/AgentTaskExecutor.java @@ -24,7 +24,7 @@ public class AgentTaskExecutor { - private static final ExecutorService EXECUTOR = ThreadPoolManager.newDaemonCacheThreadPool(Config.max_agent_task_threads_num, "agent-task-pool"); + private static final ExecutorService EXECUTOR = ThreadPoolManager.newDaemonCacheThreadPool(Config.max_agent_task_threads_num, "agent-task-pool", true); public AgentTaskExecutor() { diff --git a/fe/src/main/java/org/apache/doris/task/MasterTaskExecutor.java b/fe/src/main/java/org/apache/doris/task/MasterTaskExecutor.java index 810c9cce27da58..d7b7348a76b923 100644 --- a/fe/src/main/java/org/apache/doris/task/MasterTaskExecutor.java +++ b/fe/src/main/java/org/apache/doris/task/MasterTaskExecutor.java @@ -39,10 +39,10 @@ public class MasterTaskExecutor { private Map> runningTasks; public ScheduledThreadPoolExecutor scheduledThreadPool; - public MasterTaskExecutor(String name, int threadNum) { - executor = ThreadPoolManager.newDaemonFixedThreadPool(threadNum, threadNum * 2, name + "_pool"); + public MasterTaskExecutor(String name, int threadNum, boolean needRegisterMetric) { + executor = ThreadPoolManager.newDaemonFixedThreadPool(threadNum, threadNum * 2, name + "_pool", needRegisterMetric); runningTasks = Maps.newHashMap(); - scheduledThreadPool = ThreadPoolManager.newDaemonScheduledThreadPool(1, name + "_scheduler_thread_pool"); + scheduledThreadPool = ThreadPoolManager.newDaemonScheduledThreadPool(1, name + "_scheduler_thread_pool", needRegisterMetric); } public void start() { diff --git a/fe/src/main/java/org/apache/doris/task/PullLoadJobMgr.java b/fe/src/main/java/org/apache/doris/task/PullLoadJobMgr.java index ed8c3240377c32..50a80681ce3e4e 100644 --- a/fe/src/main/java/org/apache/doris/task/PullLoadJobMgr.java +++ b/fe/src/main/java/org/apache/doris/task/PullLoadJobMgr.java @@ -46,8 +46,8 @@ public class PullLoadJobMgr { private int concurrency = 10; - public PullLoadJobMgr() { - executorService = ThreadPoolManager.newDaemonCacheThreadPool(concurrency, "pull-load-job-mgr"); + public PullLoadJobMgr(boolean needRegisterMetric) { + executorService = ThreadPoolManager.newDaemonCacheThreadPool(concurrency, "pull-load-job-mgr", needRegisterMetric); } /** diff --git a/fe/src/test/java/org/apache/doris/common/ThreadPoolManagerTest.java b/fe/src/test/java/org/apache/doris/common/ThreadPoolManagerTest.java index 2ae2ea99e6a070..a31b15cc394355 100755 --- a/fe/src/test/java/org/apache/doris/common/ThreadPoolManagerTest.java +++ b/fe/src/test/java/org/apache/doris/common/ThreadPoolManagerTest.java @@ -29,9 +29,9 @@ public class ThreadPoolManagerTest { @Test public void testNormal() throws InterruptedException { - ThreadPoolExecutor testCachedPool = ThreadPoolManager.newDaemonCacheThreadPool(2, "test_cache_pool"); + ThreadPoolExecutor testCachedPool = ThreadPoolManager.newDaemonCacheThreadPool(2, "test_cache_pool", true); ThreadPoolExecutor testFixedThreaddPool = ThreadPoolManager.newDaemonFixedThreadPool(2, 2, - "test_fixed_thread_pool"); + "test_fixed_thread_pool", true); ThreadPoolManager.registerThreadPoolMetric("test_cache_pool", testCachedPool); ThreadPoolManager.registerThreadPoolMetric("test_fixed_thread_pool", testFixedThreaddPool); diff --git a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java index 2b51cdda83be42..7228a2eafc99e2 100644 --- a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java +++ b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java @@ -165,7 +165,7 @@ public void functionTest(@Mocked Catalog catalog, RoutineLoadTaskScheduler routineLoadTaskScheduler = new RoutineLoadTaskScheduler(); routineLoadTaskScheduler.setInterval(5000); - ExecutorService executorService = ThreadPoolManager.newDaemonFixedThreadPool(2, 2, "routine-load-task-scheduler"); + ExecutorService executorService = ThreadPoolManager.newDaemonFixedThreadPool(2, 2, "routine-load-task-scheduler", false); executorService.submit(routineLoadScheduler); executorService.submit(routineLoadTaskScheduler); diff --git a/fe/src/test/java/org/apache/doris/task/MasterTaskExecutorTest.java b/fe/src/test/java/org/apache/doris/task/MasterTaskExecutorTest.java index 3b2471a68ba2eb..352e300b0941c9 100644 --- a/fe/src/test/java/org/apache/doris/task/MasterTaskExecutorTest.java +++ b/fe/src/test/java/org/apache/doris/task/MasterTaskExecutorTest.java @@ -33,7 +33,7 @@ public class MasterTaskExecutorTest { @Before public void setUp() { - executor = new MasterTaskExecutor("master_task_executor_test", THREAD_NUM); + executor = new MasterTaskExecutor("master_task_executor_test", THREAD_NUM, false); executor.start(); } From 7d19205a1de65cd598e1af4cce7555711aebb246 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?caiconghui=20=5B=E8=94=A1=E8=81=AA=E8=BE=89=5D?= Date: Wed, 8 Jul 2020 14:56:38 +0800 Subject: [PATCH 7/9] add comment about isCheckpointCatalog parameter in Catalog constructor --- fe/src/main/java/org/apache/doris/catalog/Catalog.java | 1 + 1 file changed, 1 insertion(+) diff --git a/fe/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/src/main/java/org/apache/doris/catalog/Catalog.java index e0a838c721b454..727dbe4d271faa 100755 --- a/fe/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/src/main/java/org/apache/doris/catalog/Catalog.java @@ -462,6 +462,7 @@ private static class SingletonHolder { private static final Catalog INSTANCE = new Catalog(false); } + // if isCheckpointCatalog is true, it means that we should not collect thread pool metric private Catalog(boolean isCheckpointCatalog) { this.idToDb = new ConcurrentHashMap<>(); this.fullNameToDb = new ConcurrentHashMap<>(); From 45267aafda32f06f109b1c17294b5540880f8b6b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?caiconghui=20=5B=E8=94=A1=E8=81=AA=E8=BE=89=5D?= Date: Wed, 15 Jul 2020 10:28:05 +0800 Subject: [PATCH 8/9] Add shutdown scheduledThreadPool when close MasterTaskExecutor --- fe/src/main/java/org/apache/doris/task/MasterTaskExecutor.java | 1 + 1 file changed, 1 insertion(+) diff --git a/fe/src/main/java/org/apache/doris/task/MasterTaskExecutor.java b/fe/src/main/java/org/apache/doris/task/MasterTaskExecutor.java index d7b7348a76b923..9b26556a3d10e9 100644 --- a/fe/src/main/java/org/apache/doris/task/MasterTaskExecutor.java +++ b/fe/src/main/java/org/apache/doris/task/MasterTaskExecutor.java @@ -68,6 +68,7 @@ public boolean submit(MasterTask task) { } public void close() { + scheduledThreadPool.shutdown(); executor.shutdown(); runningTasks.clear(); } From a24c7c4c3ec4c79bbb4c632918048cccd06a1f8c Mon Sep 17 00:00:00 2001 From: caiconghui Date: Thu, 16 Jul 2020 00:25:42 +0800 Subject: [PATCH 9/9] fix unit test failed --- fe/src/main/java/org/apache/doris/catalog/Catalog.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/fe/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/src/main/java/org/apache/doris/catalog/Catalog.java index 727dbe4d271faa..f77921d5887622 100755 --- a/fe/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/src/main/java/org/apache/doris/catalog/Catalog.java @@ -459,7 +459,11 @@ public DynamicPartitionScheduler getDynamicPartitionScheduler() { } private static class SingletonHolder { - private static final Catalog INSTANCE = new Catalog(false); + private static final Catalog INSTANCE = new Catalog(); + } + + private Catalog() { + this(false); } // if isCheckpointCatalog is true, it means that we should not collect thread pool metric