diff --git a/fe/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/src/main/java/org/apache/doris/catalog/Catalog.java index 0c0251bcdde292..f77921d5887622 100755 --- a/fe/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/src/main/java/org/apache/doris/catalog/Catalog.java @@ -111,6 +111,7 @@ import org.apache.doris.common.MarkedCountDownLatch; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.Pair; +import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.common.UserException; import org.apache.doris.common.io.Text; import org.apache.doris.common.util.Daemon; @@ -462,6 +463,11 @@ private static class SingletonHolder { } private Catalog() { + this(false); + } + + // if isCheckpointCatalog is true, it means that we should not collect thread pool metric + private Catalog(boolean isCheckpointCatalog) { this.idToDb = new ConcurrentHashMap<>(); this.fullNameToDb = new ConcurrentHashMap<>(); this.load = new Load(); @@ -491,7 +497,7 @@ private Catalog() { this.masterIp = ""; this.systemInfo = new SystemInfoService(); - this.heartbeatMgr = new HeartbeatMgr(systemInfo); + this.heartbeatMgr = new HeartbeatMgr(systemInfo, !isCheckpointCatalog); this.tabletInvertedIndex = new TabletInvertedIndex(); this.colocateTableIndex = new ColocateTableIndex(); this.recycleBin = new CatalogRecycleBin(); @@ -505,7 +511,7 @@ private Catalog() { this.isDefaultClusterCreated = false; - this.pullLoadJobMgr = new PullLoadJobMgr(); + this.pullLoadJobMgr = new PullLoadJobMgr(!isCheckpointCatalog); this.brokerMgr = new BrokerMgr(); this.resourceMgr = new ResourceMgr(); @@ -524,7 +530,7 @@ private Catalog() { this.tabletScheduler = new TabletScheduler(this, systemInfo, tabletInvertedIndex, stat); this.tabletChecker = new TabletChecker(this, systemInfo, tabletScheduler, stat); - this.loadTaskScheduler = new MasterTaskExecutor(Config.async_load_task_pool_size); + this.loadTaskScheduler = new MasterTaskExecutor("load_task_scheduler", Config.async_load_task_pool_size, !isCheckpointCatalog); this.loadJobScheduler = new LoadJobScheduler(); this.loadManager = new LoadManager(loadJobScheduler); this.loadTimeoutChecker = new LoadTimeoutChecker(loadManager); @@ -557,7 +563,7 @@ public static Catalog getCurrentCatalog() { // only checkpoint thread it self will goes here. // so no need to care about the thread safe. if (CHECKPOINT == null) { - CHECKPOINT = new Catalog(); + CHECKPOINT = new Catalog(true); } return CHECKPOINT; } else { @@ -1208,6 +1214,8 @@ private void transferToMaster() { String msg = "master finished to replay journal, can write now."; Util.stdoutWithTime(msg); LOG.info(msg); + // for master, there are some new thread pools need to register metric + ThreadPoolManager.registerAllThreadPoolMetric(); } /* @@ -1247,6 +1255,7 @@ private void startMasterOnlyDaemonThreads() { LoadChecker.init(Config.load_checker_interval_second * 1000L); LoadChecker.startAll(); // New load scheduler + loadTaskScheduler.start(); loadManager.prepareJobs(); loadJobScheduler.start(); loadTimeoutChecker.start(); diff --git a/fe/src/main/java/org/apache/doris/common/ThreadPoolManager.java b/fe/src/main/java/org/apache/doris/common/ThreadPoolManager.java index c177cf7b1822a6..ca923a9e713648 100644 --- a/fe/src/main/java/org/apache/doris/common/ThreadPoolManager.java +++ b/fe/src/main/java/org/apache/doris/common/ThreadPoolManager.java @@ -39,7 +39,7 @@ /** * ThreadPoolManager is a helper class for construct daemon thread pool with limit thread and memory resource. * thread names in thread pool are formatted as poolName-ID, where ID is a unique, sequentially assigned integer. - * it provide three functions to construct thread pool now. + * it provide four functions to construct thread pool now. * * 1. newDaemonCacheThreadPool * Wrapper over newCachedThreadPool with additional maxNumThread limit. @@ -47,6 +47,8 @@ * Wrapper over newCachedThreadPool with additional blocking queue capacity limit. * 3. newDaemonThreadPool * Wrapper over ThreadPoolExecutor, user can use it to construct thread pool more flexibly. + * 4. newDaemonScheduledThreadPool + * Wrapper over ScheduledThreadPoolExecutor, but without delay task num limit and thread num limit now(NOTICE). * * All thread pool constructed by ThreadPoolManager will be added to the nameToThreadPoolMap, * so the thread pool name in fe must be unique. @@ -66,6 +68,7 @@ public static void registerAllThreadPoolMetric() { for (Map.Entry entry : nameToThreadPoolMap.entrySet()) { registerThreadPoolMetric(entry.getKey(), entry.getValue()); } + nameToThreadPoolMap.clear(); } public static void registerThreadPoolMetric(String poolName, ThreadPoolExecutor threadPool) { @@ -92,13 +95,14 @@ public Integer getValue() { } } - public static ThreadPoolExecutor newDaemonCacheThreadPool(int maxNumThread, String poolName) { - return newDaemonThreadPool(0, maxNumThread, KEEP_ALIVE_TIME, TimeUnit.SECONDS, new SynchronousQueue(), new LogDiscardPolicy(poolName), poolName); + public static ThreadPoolExecutor newDaemonCacheThreadPool(int maxNumThread, String poolName, boolean needRegisterMetric) { + return newDaemonThreadPool(0, maxNumThread, KEEP_ALIVE_TIME, TimeUnit.SECONDS, new SynchronousQueue(), + new LogDiscardPolicy(poolName), poolName, needRegisterMetric); } - public static ThreadPoolExecutor newDaemonFixedThreadPool(int numThread, int queueSize, String poolName) { + public static ThreadPoolExecutor newDaemonFixedThreadPool(int numThread, int queueSize, String poolName, boolean needRegisterMetric) { return newDaemonThreadPool(numThread, numThread, KEEP_ALIVE_TIME ,TimeUnit.SECONDS, new LinkedBlockingQueue<>(queueSize), - new BlockedPolicy(poolName, 60), poolName); + new BlockedPolicy(poolName, 60), poolName, needRegisterMetric); } public static ThreadPoolExecutor newDaemonThreadPool(int corePoolSize, @@ -107,17 +111,25 @@ public static ThreadPoolExecutor newDaemonThreadPool(int corePoolSize, TimeUnit unit, BlockingQueue workQueue, RejectedExecutionHandler handler, - String poolName) { + String poolName, + boolean needRegisterMetric) { ThreadFactory threadFactory = namedThreadFactory(poolName); ThreadPoolExecutor threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); - nameToThreadPoolMap.put(poolName, threadPool); + if (needRegisterMetric) { + nameToThreadPoolMap.put(poolName, threadPool); + } return threadPool; } - public static ScheduledThreadPoolExecutor newScheduledThreadPool(int maxNumThread, String poolName) { + // Now, we have no delay task num limit and thread num limit in ScheduledThreadPoolExecutor, + // so it may cause oom when there are too many delay tasks or threads in ScheduledThreadPoolExecutor + // Please use this api only for scheduling short task at fix rate. + public static ScheduledThreadPoolExecutor newDaemonScheduledThreadPool(int corePoolSize, String poolName, boolean needRegisterMetric) { ThreadFactory threadFactory = namedThreadFactory(poolName); - ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(maxNumThread, threadFactory); - nameToThreadPoolMap.put(poolName, scheduledThreadPoolExecutor); + ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(corePoolSize, threadFactory); + if (needRegisterMetric) { + nameToThreadPoolMap.put(poolName, scheduledThreadPoolExecutor); + } return scheduledThreadPoolExecutor; } diff --git a/fe/src/main/java/org/apache/doris/common/ThriftServer.java b/fe/src/main/java/org/apache/doris/common/ThriftServer.java index 3c293dd1fa2704..14eeaebc301b11 100644 --- a/fe/src/main/java/org/apache/doris/common/ThriftServer.java +++ b/fe/src/main/java/org/apache/doris/common/ThriftServer.java @@ -100,7 +100,7 @@ private void createThreadedServer() throws TTransportException { TThreadedSelectorServer.Args args = new TThreadedSelectorServer.Args(new TNonblockingServerSocket(port)).protocolFactory( new TBinaryProtocol.Factory()).processor(processor); - ThreadPoolExecutor threadPoolExecutor = ThreadPoolManager.newDaemonCacheThreadPool(Config.thrift_server_max_worker_threads, "thrift-server-pool"); + ThreadPoolExecutor threadPoolExecutor = ThreadPoolManager.newDaemonCacheThreadPool(Config.thrift_server_max_worker_threads, "thrift-server-pool", true); args.executorService(threadPoolExecutor); server = new TThreadedSelectorServer(args); } @@ -114,7 +114,7 @@ private void createThreadPoolServer() throws TTransportException { TThreadPoolServer.Args serverArgs = new TThreadPoolServer.Args(new TServerSocket(socketTransportArgs)).protocolFactory( new TBinaryProtocol.Factory()).processor(processor); - ThreadPoolExecutor threadPoolExecutor = ThreadPoolManager.newDaemonCacheThreadPool(Config.thrift_server_max_worker_threads, "thrift-server-pool"); + ThreadPoolExecutor threadPoolExecutor = ThreadPoolManager.newDaemonCacheThreadPool(Config.thrift_server_max_worker_threads, "thrift-server-pool", true); serverArgs.executorService(threadPoolExecutor); server = new TThreadPoolServer(serverArgs); } diff --git a/fe/src/main/java/org/apache/doris/common/publish/ClusterStatePublisher.java b/fe/src/main/java/org/apache/doris/common/publish/ClusterStatePublisher.java index f96e4b7d0f99a5..3a7cae77256b5c 100644 --- a/fe/src/main/java/org/apache/doris/common/publish/ClusterStatePublisher.java +++ b/fe/src/main/java/org/apache/doris/common/publish/ClusterStatePublisher.java @@ -41,7 +41,7 @@ public class ClusterStatePublisher { private static final Logger LOG = LogManager.getLogger(ClusterStatePublisher.class); private static ClusterStatePublisher INSTANCE; - private ExecutorService executor = ThreadPoolManager.newDaemonFixedThreadPool(5, 256, "cluster-state-publisher"); + private ExecutorService executor = ThreadPoolManager.newDaemonFixedThreadPool(5, 256, "cluster-state-publisher", true); private SystemInfoService clusterInfoService; diff --git a/fe/src/main/java/org/apache/doris/common/publish/FixedTimePublisher.java b/fe/src/main/java/org/apache/doris/common/publish/FixedTimePublisher.java deleted file mode 100644 index 658068db5c3503..00000000000000 --- a/fe/src/main/java/org/apache/doris/common/publish/FixedTimePublisher.java +++ /dev/null @@ -1,72 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.common.publish; - -import org.apache.doris.common.Config; -import org.apache.doris.common.ThreadPoolManager; - -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -// Fixed time scheduled publisher. -// You can register your routine publish here. -public class FixedTimePublisher { - private static FixedTimePublisher INSTANCE; - - private ScheduledThreadPoolExecutor scheduler = ThreadPoolManager.newScheduledThreadPool(1, "Fixed-Time-Publisher"); - private ClusterStatePublisher publisher; - - public FixedTimePublisher(ClusterStatePublisher publisher) { - this.publisher = publisher; - } - - public static FixedTimePublisher getInstance() { - if (INSTANCE == null) { - INSTANCE = new FixedTimePublisher(ClusterStatePublisher.getInstance()); - } - return INSTANCE; - } - - public void register(Callback callback, long intervalMs) { - scheduler.scheduleAtFixedRate(new Worker(callback), 0, intervalMs, TimeUnit.MILLISECONDS); - } - - private class Worker implements Runnable { - private Callback callback; - - public Worker(Callback callback) { - this.callback = callback; - } - - @Override - public void run() { - ClusterStateUpdate.Builder builder = ClusterStateUpdate.builder(); - builder.addUpdate(callback.getTopicUpdate()); - ClusterStateUpdate state = builder.build(); - Listener listener = Listeners.nullToNoOpListener(callback.getListener()); - - publisher.publish(state, listener, Config.meta_publish_timeout_ms); - } - } - - public static interface Callback { - public TopicUpdate getTopicUpdate(); - - public Listener getListener(); - } -} diff --git a/fe/src/main/java/org/apache/doris/load/ExportChecker.java b/fe/src/main/java/org/apache/doris/load/ExportChecker.java index 414cfbd774d63e..eccd6091d5c670 100644 --- a/fe/src/main/java/org/apache/doris/load/ExportChecker.java +++ b/fe/src/main/java/org/apache/doris/load/ExportChecker.java @@ -53,10 +53,10 @@ public static void init(long intervalMs) { checkers.put(JobState.EXPORTING, new ExportChecker(JobState.EXPORTING, intervalMs)); int poolSize = Config.export_running_job_num_limit == 0 ? 5 : Config.export_running_job_num_limit; - MasterTaskExecutor pendingTaskExecutor = new MasterTaskExecutor(poolSize); + MasterTaskExecutor pendingTaskExecutor = new MasterTaskExecutor("export_pending_job", poolSize, true); executors.put(JobState.PENDING, pendingTaskExecutor); - MasterTaskExecutor exportingTaskExecutor = new MasterTaskExecutor(poolSize); + MasterTaskExecutor exportingTaskExecutor = new MasterTaskExecutor("export_exporting_job", poolSize, true); executors.put(JobState.EXPORTING, exportingTaskExecutor); } @@ -64,6 +64,9 @@ public static void startAll() { for (ExportChecker exportChecker : checkers.values()) { exportChecker.start(); } + for (MasterTaskExecutor masterTaskExecutor : executors.values()) { + masterTaskExecutor.start(); + } } @Override diff --git a/fe/src/main/java/org/apache/doris/load/LoadChecker.java b/fe/src/main/java/org/apache/doris/load/LoadChecker.java index d6e0cb7d862f7a..20620b5f0d4629 100644 --- a/fe/src/main/java/org/apache/doris/load/LoadChecker.java +++ b/fe/src/main/java/org/apache/doris/load/LoadChecker.java @@ -92,14 +92,14 @@ public static void init(long intervalMs) { Map pendingPriorityMap = Maps.newHashMap(); pendingPriorityMap.put(TPriority.NORMAL, - new MasterTaskExecutor(Config.load_pending_thread_num_normal_priority)); + new MasterTaskExecutor("load_pending_thread_num_normal_priority", Config.load_pending_thread_num_normal_priority, true)); pendingPriorityMap.put(TPriority.HIGH, - new MasterTaskExecutor(Config.load_pending_thread_num_high_priority)); + new MasterTaskExecutor("load_pending_thread_num_high_priority", Config.load_pending_thread_num_high_priority, true)); executors.put(JobState.PENDING, pendingPriorityMap); Map etlPriorityMap = Maps.newHashMap(); - etlPriorityMap.put(TPriority.NORMAL, new MasterTaskExecutor(Config.load_etl_thread_num_normal_priority)); - etlPriorityMap.put(TPriority.HIGH, new MasterTaskExecutor(Config.load_etl_thread_num_high_priority)); + etlPriorityMap.put(TPriority.NORMAL, new MasterTaskExecutor("load_etl_thread_num_normal_priority", Config.load_etl_thread_num_normal_priority, true)); + etlPriorityMap.put(TPriority.HIGH, new MasterTaskExecutor("load_etl_thread_num_high_priority", Config.load_etl_thread_num_high_priority, true)); executors.put(JobState.ETL, etlPriorityMap); } @@ -110,6 +110,11 @@ public static void startAll() { for (LoadChecker loadChecker : checkers.values()) { loadChecker.start(); } + for (Map map : executors.values()) { + for (MasterTaskExecutor masterTaskExecutor : map.values()) { + masterTaskExecutor.start(); + } + } } @Override diff --git a/fe/src/main/java/org/apache/doris/metric/MetricRepo.java b/fe/src/main/java/org/apache/doris/metric/MetricRepo.java index 8d4139948268c8..4e5e4e112cfb14 100644 --- a/fe/src/main/java/org/apache/doris/metric/MetricRepo.java +++ b/fe/src/main/java/org/apache/doris/metric/MetricRepo.java @@ -86,7 +86,7 @@ public final class MetricRepo { public static GaugeMetricImpl GAUGE_QUERY_ERR_RATE; public static GaugeMetricImpl GAUGE_MAX_TABLET_COMPACTION_SCORE; - private static ScheduledThreadPoolExecutor metricTimer = ThreadPoolManager.newScheduledThreadPool(1, "Metric-Timer-Pool"); + private static ScheduledThreadPoolExecutor metricTimer = ThreadPoolManager.newDaemonScheduledThreadPool(1, "Metric-Timer-Pool", true); private static MetricCalculator metricCalculator = new MetricCalculator(); public static synchronized void init() { diff --git a/fe/src/main/java/org/apache/doris/mysql/MysqlServer.java b/fe/src/main/java/org/apache/doris/mysql/MysqlServer.java index 1cd099bb672e7f..ed1d599978668d 100644 --- a/fe/src/main/java/org/apache/doris/mysql/MysqlServer.java +++ b/fe/src/main/java/org/apache/doris/mysql/MysqlServer.java @@ -70,7 +70,7 @@ public boolean start() { } // start accept thread - listener = ThreadPoolManager.newDaemonCacheThreadPool(1, "MySQL-Protocol-Listener"); + listener = ThreadPoolManager.newDaemonCacheThreadPool(1, "MySQL-Protocol-Listener", true); running = true; listenerFuture = listener.submit(new Listener()); diff --git a/fe/src/main/java/org/apache/doris/mysql/nio/NMysqlServer.java b/fe/src/main/java/org/apache/doris/mysql/nio/NMysqlServer.java index c0ed55f7513b9a..964abaa045f9a4 100644 --- a/fe/src/main/java/org/apache/doris/mysql/nio/NMysqlServer.java +++ b/fe/src/main/java/org/apache/doris/mysql/nio/NMysqlServer.java @@ -46,7 +46,7 @@ public class NMysqlServer extends MysqlServer { private AcceptingChannel server; // default task service. - private ExecutorService taskService = ThreadPoolManager.newDaemonCacheThreadPool(Config.max_mysql_service_task_threads_num, "doris-mysql-nio-pool"); + private ExecutorService taskService = ThreadPoolManager.newDaemonCacheThreadPool(Config.max_mysql_service_task_threads_num, "doris-mysql-nio-pool", true); public NMysqlServer(int port, ConnectScheduler connectScheduler) { this.port = port; diff --git a/fe/src/main/java/org/apache/doris/qe/ConnectScheduler.java b/fe/src/main/java/org/apache/doris/qe/ConnectScheduler.java index 6f4ec55c21782f..5d431c476ba443 100644 --- a/fe/src/main/java/org/apache/doris/qe/ConnectScheduler.java +++ b/fe/src/main/java/org/apache/doris/qe/ConnectScheduler.java @@ -48,14 +48,14 @@ public class ConnectScheduler { private AtomicInteger nextConnectionId; private Map connectionMap = Maps.newHashMap(); private Map connByUser = Maps.newHashMap(); - private ExecutorService executor = ThreadPoolManager.newDaemonCacheThreadPool(Config.max_connection_scheduler_threads_num, "connect-scheduler-pool"); + private ExecutorService executor = ThreadPoolManager.newDaemonCacheThreadPool(Config.max_connection_scheduler_threads_num, "connect-scheduler-pool", true); // Use a thread to check whether connection is timeout. Because // 1. If use a scheduler, the task maybe a huge number when query is messy. // Let timeout is 10m, and 5000 qps, then there are up to 3000000 tasks in scheduler. // 2. Use a thread to poll maybe lose some accurate, but is enough to us. - private ScheduledExecutorService checkTimer = ThreadPoolManager.newScheduledThreadPool(1, - "Connect-Scheduler-Check-Timer"); + private ScheduledExecutorService checkTimer = ThreadPoolManager.newDaemonScheduledThreadPool(1, + "Connect-Scheduler-Check-Timer", true); public ConnectScheduler(int maxConnections) { this.maxConnections = maxConnections; diff --git a/fe/src/main/java/org/apache/doris/system/HeartbeatMgr.java b/fe/src/main/java/org/apache/doris/system/HeartbeatMgr.java index 6f391b419e2665..7948c6371254cd 100644 --- a/fe/src/main/java/org/apache/doris/system/HeartbeatMgr.java +++ b/fe/src/main/java/org/apache/doris/system/HeartbeatMgr.java @@ -70,11 +70,11 @@ public class HeartbeatMgr extends MasterDaemon { private static volatile AtomicReference masterInfo = new AtomicReference<>(); - public HeartbeatMgr(SystemInfoService nodeMgr) { + public HeartbeatMgr(SystemInfoService nodeMgr, boolean needRegisterMetric) { super("heartbeat mgr", FeConstants.heartbeat_interval_second * 1000); this.nodeMgr = nodeMgr; this.executor = ThreadPoolManager.newDaemonFixedThreadPool(Config.heartbeat_mgr_threads_num, - Config.heartbeat_mgr_blocking_queue_size, "heartbeat-mgr-pool"); + Config.heartbeat_mgr_blocking_queue_size, "heartbeat-mgr-pool", needRegisterMetric); this.heartbeatFlags = new HeartbeatFlags(); } diff --git a/fe/src/main/java/org/apache/doris/task/AgentTaskExecutor.java b/fe/src/main/java/org/apache/doris/task/AgentTaskExecutor.java index 4ebd8d44244e79..a206bb309a3fee 100644 --- a/fe/src/main/java/org/apache/doris/task/AgentTaskExecutor.java +++ b/fe/src/main/java/org/apache/doris/task/AgentTaskExecutor.java @@ -24,7 +24,7 @@ public class AgentTaskExecutor { - private static final ExecutorService EXECUTOR = ThreadPoolManager.newDaemonCacheThreadPool(Config.max_agent_task_threads_num, "agent-task-pool"); + private static final ExecutorService EXECUTOR = ThreadPoolManager.newDaemonCacheThreadPool(Config.max_agent_task_threads_num, "agent-task-pool", true); public AgentTaskExecutor() { diff --git a/fe/src/main/java/org/apache/doris/task/MasterTaskExecutor.java b/fe/src/main/java/org/apache/doris/task/MasterTaskExecutor.java index 90aad6b5c949e5..9b26556a3d10e9 100644 --- a/fe/src/main/java/org/apache/doris/task/MasterTaskExecutor.java +++ b/fe/src/main/java/org/apache/doris/task/MasterTaskExecutor.java @@ -27,21 +27,28 @@ import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; + public class MasterTaskExecutor { private static final Logger LOG = LogManager.getLogger(MasterTaskExecutor.class); - private ScheduledExecutorService executor; + private ThreadPoolExecutor executor; private Map> runningTasks; + public ScheduledThreadPoolExecutor scheduledThreadPool; - public MasterTaskExecutor(int threadNum) { - executor = ThreadPoolManager.newScheduledThreadPool(threadNum, "Master-Task-Executor-Pool"); + public MasterTaskExecutor(String name, int threadNum, boolean needRegisterMetric) { + executor = ThreadPoolManager.newDaemonFixedThreadPool(threadNum, threadNum * 2, name + "_pool", needRegisterMetric); runningTasks = Maps.newHashMap(); - executor.scheduleAtFixedRate(new TaskChecker(), 0L, 1000L, TimeUnit.MILLISECONDS); + scheduledThreadPool = ThreadPoolManager.newDaemonScheduledThreadPool(1, name + "_scheduler_thread_pool", needRegisterMetric); } - + + public void start() { + scheduledThreadPool.scheduleAtFixedRate(new TaskChecker(), 0L, 1000L, TimeUnit.MILLISECONDS); + } + /** * submit task to task executor * @param task @@ -61,6 +68,7 @@ public boolean submit(MasterTask task) { } public void close() { + scheduledThreadPool.shutdown(); executor.shutdown(); runningTasks.clear(); } @@ -70,7 +78,7 @@ public int getTaskNum() { return runningTasks.size(); } } - + private class TaskChecker implements Runnable { @Override public void run() { diff --git a/fe/src/main/java/org/apache/doris/task/PullLoadJobMgr.java b/fe/src/main/java/org/apache/doris/task/PullLoadJobMgr.java index ed8c3240377c32..50a80681ce3e4e 100644 --- a/fe/src/main/java/org/apache/doris/task/PullLoadJobMgr.java +++ b/fe/src/main/java/org/apache/doris/task/PullLoadJobMgr.java @@ -46,8 +46,8 @@ public class PullLoadJobMgr { private int concurrency = 10; - public PullLoadJobMgr() { - executorService = ThreadPoolManager.newDaemonCacheThreadPool(concurrency, "pull-load-job-mgr"); + public PullLoadJobMgr(boolean needRegisterMetric) { + executorService = ThreadPoolManager.newDaemonCacheThreadPool(concurrency, "pull-load-job-mgr", needRegisterMetric); } /** diff --git a/fe/src/test/java/org/apache/doris/common/ThreadPoolManagerTest.java b/fe/src/test/java/org/apache/doris/common/ThreadPoolManagerTest.java index 2ae2ea99e6a070..a31b15cc394355 100755 --- a/fe/src/test/java/org/apache/doris/common/ThreadPoolManagerTest.java +++ b/fe/src/test/java/org/apache/doris/common/ThreadPoolManagerTest.java @@ -29,9 +29,9 @@ public class ThreadPoolManagerTest { @Test public void testNormal() throws InterruptedException { - ThreadPoolExecutor testCachedPool = ThreadPoolManager.newDaemonCacheThreadPool(2, "test_cache_pool"); + ThreadPoolExecutor testCachedPool = ThreadPoolManager.newDaemonCacheThreadPool(2, "test_cache_pool", true); ThreadPoolExecutor testFixedThreaddPool = ThreadPoolManager.newDaemonFixedThreadPool(2, 2, - "test_fixed_thread_pool"); + "test_fixed_thread_pool", true); ThreadPoolManager.registerThreadPoolMetric("test_cache_pool", testCachedPool); ThreadPoolManager.registerThreadPoolMetric("test_fixed_thread_pool", testFixedThreaddPool); diff --git a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java index 2b51cdda83be42..7228a2eafc99e2 100644 --- a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java +++ b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java @@ -165,7 +165,7 @@ public void functionTest(@Mocked Catalog catalog, RoutineLoadTaskScheduler routineLoadTaskScheduler = new RoutineLoadTaskScheduler(); routineLoadTaskScheduler.setInterval(5000); - ExecutorService executorService = ThreadPoolManager.newDaemonFixedThreadPool(2, 2, "routine-load-task-scheduler"); + ExecutorService executorService = ThreadPoolManager.newDaemonFixedThreadPool(2, 2, "routine-load-task-scheduler", false); executorService.submit(routineLoadScheduler); executorService.submit(routineLoadTaskScheduler); diff --git a/fe/src/test/java/org/apache/doris/task/MasterTaskExecutorTest.java b/fe/src/test/java/org/apache/doris/task/MasterTaskExecutorTest.java index 8bc68158100bcf..352e300b0941c9 100644 --- a/fe/src/test/java/org/apache/doris/task/MasterTaskExecutorTest.java +++ b/fe/src/test/java/org/apache/doris/task/MasterTaskExecutorTest.java @@ -33,7 +33,8 @@ public class MasterTaskExecutorTest { @Before public void setUp() { - executor = new MasterTaskExecutor(THREAD_NUM); + executor = new MasterTaskExecutor("master_task_executor_test", THREAD_NUM, false); + executor.start(); } @After