Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions fe/src/main/java/org/apache/doris/catalog/Catalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
import org.apache.doris.common.MarkedCountDownLatch;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.Daemon;
Expand Down Expand Up @@ -462,6 +463,11 @@ private static class SingletonHolder {
}

private Catalog() {
this(false);
}

// if isCheckpointCatalog is true, it means that we should not collect thread pool metric
private Catalog(boolean isCheckpointCatalog) {
this.idToDb = new ConcurrentHashMap<>();
this.fullNameToDb = new ConcurrentHashMap<>();
this.load = new Load();
Expand Down Expand Up @@ -491,7 +497,7 @@ private Catalog() {
this.masterIp = "";

this.systemInfo = new SystemInfoService();
this.heartbeatMgr = new HeartbeatMgr(systemInfo);
this.heartbeatMgr = new HeartbeatMgr(systemInfo, !isCheckpointCatalog);
this.tabletInvertedIndex = new TabletInvertedIndex();
this.colocateTableIndex = new ColocateTableIndex();
this.recycleBin = new CatalogRecycleBin();
Expand All @@ -505,7 +511,7 @@ private Catalog() {

this.isDefaultClusterCreated = false;

this.pullLoadJobMgr = new PullLoadJobMgr();
this.pullLoadJobMgr = new PullLoadJobMgr(!isCheckpointCatalog);
this.brokerMgr = new BrokerMgr();
this.resourceMgr = new ResourceMgr();

Expand All @@ -524,7 +530,7 @@ private Catalog() {
this.tabletScheduler = new TabletScheduler(this, systemInfo, tabletInvertedIndex, stat);
this.tabletChecker = new TabletChecker(this, systemInfo, tabletScheduler, stat);

this.loadTaskScheduler = new MasterTaskExecutor(Config.async_load_task_pool_size);
this.loadTaskScheduler = new MasterTaskExecutor("load_task_scheduler", Config.async_load_task_pool_size, !isCheckpointCatalog);
this.loadJobScheduler = new LoadJobScheduler();
this.loadManager = new LoadManager(loadJobScheduler);
this.loadTimeoutChecker = new LoadTimeoutChecker(loadManager);
Expand Down Expand Up @@ -557,7 +563,7 @@ public static Catalog getCurrentCatalog() {
// only checkpoint thread it self will goes here.
// so no need to care about the thread safe.
if (CHECKPOINT == null) {
CHECKPOINT = new Catalog();
CHECKPOINT = new Catalog(true);
}
return CHECKPOINT;
} else {
Expand Down Expand Up @@ -1208,6 +1214,8 @@ private void transferToMaster() {
String msg = "master finished to replay journal, can write now.";
Util.stdoutWithTime(msg);
LOG.info(msg);
// for master, there are some new thread pools need to register metric
ThreadPoolManager.registerAllThreadPoolMetric();
}

/*
Expand Down Expand Up @@ -1247,6 +1255,7 @@ private void startMasterOnlyDaemonThreads() {
LoadChecker.init(Config.load_checker_interval_second * 1000L);
LoadChecker.startAll();
// New load scheduler
loadTaskScheduler.start();
loadManager.prepareJobs();
loadJobScheduler.start();
loadTimeoutChecker.start();
Expand Down
32 changes: 22 additions & 10 deletions fe/src/main/java/org/apache/doris/common/ThreadPoolManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,16 @@
/**
* ThreadPoolManager is a helper class for construct daemon thread pool with limit thread and memory resource.
* thread names in thread pool are formatted as poolName-ID, where ID is a unique, sequentially assigned integer.
* it provide three functions to construct thread pool now.
* it provide four functions to construct thread pool now.
*
* 1. newDaemonCacheThreadPool
* Wrapper over newCachedThreadPool with additional maxNumThread limit.
* 2. newDaemonFixedThreadPool
* Wrapper over newCachedThreadPool with additional blocking queue capacity limit.
* 3. newDaemonThreadPool
* Wrapper over ThreadPoolExecutor, user can use it to construct thread pool more flexibly.
* 4. newDaemonScheduledThreadPool
* Wrapper over ScheduledThreadPoolExecutor, but without delay task num limit and thread num limit now(NOTICE).
*
* All thread pool constructed by ThreadPoolManager will be added to the nameToThreadPoolMap,
* so the thread pool name in fe must be unique.
Expand All @@ -66,6 +68,7 @@ public static void registerAllThreadPoolMetric() {
for (Map.Entry<String, ThreadPoolExecutor> entry : nameToThreadPoolMap.entrySet()) {
registerThreadPoolMetric(entry.getKey(), entry.getValue());
}
nameToThreadPoolMap.clear();
}

public static void registerThreadPoolMetric(String poolName, ThreadPoolExecutor threadPool) {
Expand All @@ -92,13 +95,14 @@ public Integer getValue() {
}
}

public static ThreadPoolExecutor newDaemonCacheThreadPool(int maxNumThread, String poolName) {
return newDaemonThreadPool(0, maxNumThread, KEEP_ALIVE_TIME, TimeUnit.SECONDS, new SynchronousQueue(), new LogDiscardPolicy(poolName), poolName);
public static ThreadPoolExecutor newDaemonCacheThreadPool(int maxNumThread, String poolName, boolean needRegisterMetric) {
return newDaemonThreadPool(0, maxNumThread, KEEP_ALIVE_TIME, TimeUnit.SECONDS, new SynchronousQueue(),
new LogDiscardPolicy(poolName), poolName, needRegisterMetric);
}

public static ThreadPoolExecutor newDaemonFixedThreadPool(int numThread, int queueSize, String poolName) {
public static ThreadPoolExecutor newDaemonFixedThreadPool(int numThread, int queueSize, String poolName, boolean needRegisterMetric) {
return newDaemonThreadPool(numThread, numThread, KEEP_ALIVE_TIME ,TimeUnit.SECONDS, new LinkedBlockingQueue<>(queueSize),
new BlockedPolicy(poolName, 60), poolName);
new BlockedPolicy(poolName, 60), poolName, needRegisterMetric);
}

public static ThreadPoolExecutor newDaemonThreadPool(int corePoolSize,
Expand All @@ -107,17 +111,25 @@ public static ThreadPoolExecutor newDaemonThreadPool(int corePoolSize,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler,
String poolName) {
String poolName,
boolean needRegisterMetric) {
ThreadFactory threadFactory = namedThreadFactory(poolName);
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
nameToThreadPoolMap.put(poolName, threadPool);
if (needRegisterMetric) {
nameToThreadPoolMap.put(poolName, threadPool);
}
return threadPool;
}

public static ScheduledThreadPoolExecutor newScheduledThreadPool(int maxNumThread, String poolName) {
// Now, we have no delay task num limit and thread num limit in ScheduledThreadPoolExecutor,
// so it may cause oom when there are too many delay tasks or threads in ScheduledThreadPoolExecutor
// Please use this api only for scheduling short task at fix rate.
public static ScheduledThreadPoolExecutor newDaemonScheduledThreadPool(int corePoolSize, String poolName, boolean needRegisterMetric) {
ThreadFactory threadFactory = namedThreadFactory(poolName);
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(maxNumThread, threadFactory);
nameToThreadPoolMap.put(poolName, scheduledThreadPoolExecutor);
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
if (needRegisterMetric) {
nameToThreadPoolMap.put(poolName, scheduledThreadPoolExecutor);
}
return scheduledThreadPoolExecutor;
}

Expand Down
4 changes: 2 additions & 2 deletions fe/src/main/java/org/apache/doris/common/ThriftServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ private void createThreadedServer() throws TTransportException {
TThreadedSelectorServer.Args args =
new TThreadedSelectorServer.Args(new TNonblockingServerSocket(port)).protocolFactory(
new TBinaryProtocol.Factory()).processor(processor);
ThreadPoolExecutor threadPoolExecutor = ThreadPoolManager.newDaemonCacheThreadPool(Config.thrift_server_max_worker_threads, "thrift-server-pool");
ThreadPoolExecutor threadPoolExecutor = ThreadPoolManager.newDaemonCacheThreadPool(Config.thrift_server_max_worker_threads, "thrift-server-pool", true);
args.executorService(threadPoolExecutor);
server = new TThreadedSelectorServer(args);
}
Expand All @@ -114,7 +114,7 @@ private void createThreadPoolServer() throws TTransportException {
TThreadPoolServer.Args serverArgs =
new TThreadPoolServer.Args(new TServerSocket(socketTransportArgs)).protocolFactory(
new TBinaryProtocol.Factory()).processor(processor);
ThreadPoolExecutor threadPoolExecutor = ThreadPoolManager.newDaemonCacheThreadPool(Config.thrift_server_max_worker_threads, "thrift-server-pool");
ThreadPoolExecutor threadPoolExecutor = ThreadPoolManager.newDaemonCacheThreadPool(Config.thrift_server_max_worker_threads, "thrift-server-pool", true);
serverArgs.executorService(threadPoolExecutor);
server = new TThreadPoolServer(serverArgs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class ClusterStatePublisher {
private static final Logger LOG = LogManager.getLogger(ClusterStatePublisher.class);
private static ClusterStatePublisher INSTANCE;

private ExecutorService executor = ThreadPoolManager.newDaemonFixedThreadPool(5, 256, "cluster-state-publisher");
private ExecutorService executor = ThreadPoolManager.newDaemonFixedThreadPool(5, 256, "cluster-state-publisher", true);

private SystemInfoService clusterInfoService;

Expand Down

This file was deleted.

7 changes: 5 additions & 2 deletions fe/src/main/java/org/apache/doris/load/ExportChecker.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,20 @@ public static void init(long intervalMs) {
checkers.put(JobState.EXPORTING, new ExportChecker(JobState.EXPORTING, intervalMs));

int poolSize = Config.export_running_job_num_limit == 0 ? 5 : Config.export_running_job_num_limit;
MasterTaskExecutor pendingTaskExecutor = new MasterTaskExecutor(poolSize);
MasterTaskExecutor pendingTaskExecutor = new MasterTaskExecutor("export_pending_job", poolSize, true);
executors.put(JobState.PENDING, pendingTaskExecutor);

MasterTaskExecutor exportingTaskExecutor = new MasterTaskExecutor(poolSize);
MasterTaskExecutor exportingTaskExecutor = new MasterTaskExecutor("export_exporting_job", poolSize, true);
executors.put(JobState.EXPORTING, exportingTaskExecutor);
}

public static void startAll() {
for (ExportChecker exportChecker : checkers.values()) {
exportChecker.start();
}
for (MasterTaskExecutor masterTaskExecutor : executors.values()) {
masterTaskExecutor.start();
}
}

@Override
Expand Down
13 changes: 9 additions & 4 deletions fe/src/main/java/org/apache/doris/load/LoadChecker.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,14 @@ public static void init(long intervalMs) {

Map<TPriority, MasterTaskExecutor> pendingPriorityMap = Maps.newHashMap();
pendingPriorityMap.put(TPriority.NORMAL,
new MasterTaskExecutor(Config.load_pending_thread_num_normal_priority));
new MasterTaskExecutor("load_pending_thread_num_normal_priority", Config.load_pending_thread_num_normal_priority, true));
pendingPriorityMap.put(TPriority.HIGH,
new MasterTaskExecutor(Config.load_pending_thread_num_high_priority));
new MasterTaskExecutor("load_pending_thread_num_high_priority", Config.load_pending_thread_num_high_priority, true));
executors.put(JobState.PENDING, pendingPriorityMap);

Map<TPriority, MasterTaskExecutor> etlPriorityMap = Maps.newHashMap();
etlPriorityMap.put(TPriority.NORMAL, new MasterTaskExecutor(Config.load_etl_thread_num_normal_priority));
etlPriorityMap.put(TPriority.HIGH, new MasterTaskExecutor(Config.load_etl_thread_num_high_priority));
etlPriorityMap.put(TPriority.NORMAL, new MasterTaskExecutor("load_etl_thread_num_normal_priority", Config.load_etl_thread_num_normal_priority, true));
etlPriorityMap.put(TPriority.HIGH, new MasterTaskExecutor("load_etl_thread_num_high_priority", Config.load_etl_thread_num_high_priority, true));
executors.put(JobState.ETL, etlPriorityMap);
}

Expand All @@ -110,6 +110,11 @@ public static void startAll() {
for (LoadChecker loadChecker : checkers.values()) {
loadChecker.start();
}
for (Map<TPriority, MasterTaskExecutor> map : executors.values()) {
for (MasterTaskExecutor masterTaskExecutor : map.values()) {
masterTaskExecutor.start();
}
}
}

@Override
Expand Down
2 changes: 1 addition & 1 deletion fe/src/main/java/org/apache/doris/metric/MetricRepo.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public final class MetricRepo {
public static GaugeMetricImpl<Double> GAUGE_QUERY_ERR_RATE;
public static GaugeMetricImpl<Long> GAUGE_MAX_TABLET_COMPACTION_SCORE;

private static ScheduledThreadPoolExecutor metricTimer = ThreadPoolManager.newScheduledThreadPool(1, "Metric-Timer-Pool");
private static ScheduledThreadPoolExecutor metricTimer = ThreadPoolManager.newDaemonScheduledThreadPool(1, "Metric-Timer-Pool", true);
private static MetricCalculator metricCalculator = new MetricCalculator();

public static synchronized void init() {
Expand Down
2 changes: 1 addition & 1 deletion fe/src/main/java/org/apache/doris/mysql/MysqlServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public boolean start() {
}

// start accept thread
listener = ThreadPoolManager.newDaemonCacheThreadPool(1, "MySQL-Protocol-Listener");
listener = ThreadPoolManager.newDaemonCacheThreadPool(1, "MySQL-Protocol-Listener", true);
running = true;
listenerFuture = listener.submit(new Listener());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class NMysqlServer extends MysqlServer {
private AcceptingChannel<StreamConnection> server;

// default task service.
private ExecutorService taskService = ThreadPoolManager.newDaemonCacheThreadPool(Config.max_mysql_service_task_threads_num, "doris-mysql-nio-pool");
private ExecutorService taskService = ThreadPoolManager.newDaemonCacheThreadPool(Config.max_mysql_service_task_threads_num, "doris-mysql-nio-pool", true);

public NMysqlServer(int port, ConnectScheduler connectScheduler) {
this.port = port;
Expand Down
6 changes: 3 additions & 3 deletions fe/src/main/java/org/apache/doris/qe/ConnectScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,14 @@ public class ConnectScheduler {
private AtomicInteger nextConnectionId;
private Map<Long, ConnectContext> connectionMap = Maps.newHashMap();
private Map<String, AtomicInteger> connByUser = Maps.newHashMap();
private ExecutorService executor = ThreadPoolManager.newDaemonCacheThreadPool(Config.max_connection_scheduler_threads_num, "connect-scheduler-pool");
private ExecutorService executor = ThreadPoolManager.newDaemonCacheThreadPool(Config.max_connection_scheduler_threads_num, "connect-scheduler-pool", true);

// Use a thread to check whether connection is timeout. Because
// 1. If use a scheduler, the task maybe a huge number when query is messy.
// Let timeout is 10m, and 5000 qps, then there are up to 3000000 tasks in scheduler.
// 2. Use a thread to poll maybe lose some accurate, but is enough to us.
private ScheduledExecutorService checkTimer = ThreadPoolManager.newScheduledThreadPool(1,
"Connect-Scheduler-Check-Timer");
private ScheduledExecutorService checkTimer = ThreadPoolManager.newDaemonScheduledThreadPool(1,
"Connect-Scheduler-Check-Timer", true);

public ConnectScheduler(int maxConnections) {
this.maxConnections = maxConnections;
Expand Down
4 changes: 2 additions & 2 deletions fe/src/main/java/org/apache/doris/system/HeartbeatMgr.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,11 @@ public class HeartbeatMgr extends MasterDaemon {

private static volatile AtomicReference<TMasterInfo> masterInfo = new AtomicReference<>();

public HeartbeatMgr(SystemInfoService nodeMgr) {
public HeartbeatMgr(SystemInfoService nodeMgr, boolean needRegisterMetric) {
super("heartbeat mgr", FeConstants.heartbeat_interval_second * 1000);
this.nodeMgr = nodeMgr;
this.executor = ThreadPoolManager.newDaemonFixedThreadPool(Config.heartbeat_mgr_threads_num,
Config.heartbeat_mgr_blocking_queue_size, "heartbeat-mgr-pool");
Config.heartbeat_mgr_blocking_queue_size, "heartbeat-mgr-pool", needRegisterMetric);
this.heartbeatFlags = new HeartbeatFlags();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

public class AgentTaskExecutor {

private static final ExecutorService EXECUTOR = ThreadPoolManager.newDaemonCacheThreadPool(Config.max_agent_task_threads_num, "agent-task-pool");
private static final ExecutorService EXECUTOR = ThreadPoolManager.newDaemonCacheThreadPool(Config.max_agent_task_threads_num, "agent-task-pool", true);

public AgentTaskExecutor() {

Expand Down
Loading