diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java index 2b73e27ccdd2e7..10d184d14ecbba 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java @@ -387,7 +387,9 @@ public class Catalog { private TabletChecker tabletChecker; - private MasterTaskExecutor loadTaskScheduler; + // Thread pools for pending and loading task, separately + private MasterTaskExecutor pendingLoadTaskScheduler; + private MasterTaskExecutor loadingLoadTaskScheduler; private LoadJobScheduler loadJobScheduler; @@ -537,7 +539,10 @@ private Catalog(boolean isCheckpointCatalog) { this.tabletScheduler = new TabletScheduler(this, systemInfo, tabletInvertedIndex, stat); this.tabletChecker = new TabletChecker(this, systemInfo, tabletScheduler, stat); - this.loadTaskScheduler = new MasterTaskExecutor("load_task_scheduler", Config.async_load_task_pool_size, !isCheckpointCatalog); + this.pendingLoadTaskScheduler = new MasterTaskExecutor("pending_load_task_scheduler", Config.async_load_task_pool_size, + Config.desired_max_waiting_jobs, !isCheckpointCatalog); + this.loadingLoadTaskScheduler = new MasterTaskExecutor("loading_load_task_scheduler", Config.async_load_task_pool_size, + Config.desired_max_waiting_jobs, !isCheckpointCatalog); this.loadJobScheduler = new LoadJobScheduler(); this.loadManager = new LoadManager(loadJobScheduler); this.loadTimeoutChecker = new LoadTimeoutChecker(loadManager); @@ -1238,9 +1243,6 @@ public void fixBugAfterMetadataReplayed(boolean waitCatalogReady) { } } } - - LOG.info("start to fix meta data bug"); - loadManager.fixLoadJobMetaBugs(globalTransactionMgr); } // start all daemon threads only running on Master @@ -1262,7 +1264,8 @@ private void startMasterOnlyDaemonThreads() { LoadChecker.init(Config.load_checker_interval_second * 1000L); LoadChecker.startAll(); // New load scheduler - loadTaskScheduler.start(); + pendingLoadTaskScheduler.start(); + loadingLoadTaskScheduler.start(); loadManager.prepareJobs(); loadJobScheduler.start(); loadTimeoutChecker.start(); @@ -4822,8 +4825,12 @@ public LoadManager getLoadManager() { return loadManager; } - public MasterTaskExecutor getLoadTaskScheduler() { - return loadTaskScheduler; + public MasterTaskExecutor getPendingLoadTaskScheduler() { + return pendingLoadTaskScheduler; + } + + public MasterTaskExecutor getLoadingLoadTaskScheduler() { + return loadingLoadTaskScheduler; } public RoutineLoadManager getRoutineLoadManager() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java index a253dbe919c8ed..719a4feb056e68 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java @@ -40,8 +40,8 @@ import org.apache.doris.thrift.TUniqueId; import org.apache.doris.transaction.BeginTransactionException; import org.apache.doris.transaction.TransactionState; -import org.apache.doris.transaction.TransactionState.TxnSourceType; import org.apache.doris.transaction.TransactionState.TxnCoordinator; +import org.apache.doris.transaction.TransactionState.TxnSourceType; import com.google.common.base.Joiner; import com.google.common.collect.Lists; @@ -93,7 +93,7 @@ public void beginTxn() protected void unprotectedExecuteJob() { LoadTask task = new BrokerLoadPendingTask(this, fileGroupAggInfo.getAggKeyToFileGroups(), brokerDesc); idToTasks.put(task.getSignature(), task); - Catalog.getCurrentCatalog().getLoadTaskScheduler().submit(task); + Catalog.getCurrentCatalog().getPendingLoadTaskScheduler().submit(task); } /** @@ -170,9 +170,9 @@ private void onPendingTaskFinished(BrokerPendingTaskAttachment attachment) { private void createLoadingTask(Database db, BrokerPendingTaskAttachment attachment) throws UserException { // divide job into broker loading task by table + List newLoadingTasks = Lists.newArrayList(); db.readLock(); try { - List newLoadingTasks = Lists.newArrayList(); for (Map.Entry> entry : fileGroupAggInfo.getAggKeyToFileGroups().entrySet()) { FileGroupAggKey aggKey = entry.getKey(); List brokerFileGroups = entry.getValue(); @@ -208,13 +208,15 @@ brokerFileGroups, getDeadlineMs(), execMemLimit, } txnState.addTableIndexes(table); } - // submit all tasks together - for (LoadTask loadTask : newLoadingTasks) { - Catalog.getCurrentCatalog().getLoadTaskScheduler().submit(loadTask); - } + } finally { db.readUnlock(); } + + // Submit task outside the database lock, cause it may take a while if task queue is full. + for (LoadTask loadTask : newLoadingTasks) { + Catalog.getCurrentCatalog().getLoadingLoadTaskScheduler().submit(loadTask); + } } private void onLoadingTaskFinished(BrokerLoadingTaskAttachment attachment) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java index a4cff888bf27e9..f50c80c123b205 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java @@ -45,7 +45,7 @@ public class BrokerLoadPendingTask extends LoadTask { public BrokerLoadPendingTask(BrokerLoadJob loadTaskCallback, Map> aggKeyToBrokerFileGroups, BrokerDesc brokerDesc) { - super(loadTaskCallback); + super(loadTaskCallback, TaskType.PENDING); this.retryTime = 3; this.attachment = new BrokerPendingTaskAttachment(signature); this.aggKeyToBrokerFileGroups = aggKeyToBrokerFileGroups; diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java index 1d649c65627bc0..c3a6ba5d6ff37a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java @@ -28,6 +28,7 @@ import org.apache.doris.catalog.Table; import org.apache.doris.common.DdlException; import org.apache.doris.common.FeMetaVersion; +import org.apache.doris.common.LoadException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.io.Text; import org.apache.doris.common.util.LogBuilder; @@ -43,14 +44,14 @@ import org.apache.doris.transaction.TabletCommitInfo; import org.apache.doris.transaction.TransactionState; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; @@ -235,8 +236,15 @@ public void onTaskFailed(long taskId, FailMsg failMsg) { idToTasks.put(loadTask.getSignature(), loadTask); // load id will be added to loadStatistic when executing this task try { - Catalog.getCurrentCatalog().getLoadTaskScheduler().submit(loadTask); - } catch (RejectedExecutionException e) { + if (loadTask.getTaskType() == LoadTask.TaskType.PENDING) { + Catalog.getCurrentCatalog().getPendingLoadTaskScheduler().submit(loadTask); + } else if (loadTask.getTaskType() == LoadTask.TaskType.LOADING) { + Catalog.getCurrentCatalog().getLoadingLoadTaskScheduler().submit(loadTask); + } else { + throw new LoadException(String.format("Unknown load task type: %s. task id: %d, job id, %d", + loadTask.getTaskType(), loadTask.getSignature(), id)); + } + } catch (RejectedExecutionException | LoadException e) { unprotectedExecuteCancel(failMsg, true); logFinalOperation(); return; diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java index caa6081e7073e7..85c939f492a319 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java @@ -68,7 +68,7 @@ public LoadLoadingTask(Database db, OlapTable table, long jobDeadlineMs, long execMemLimit, boolean strictMode, long txnId, LoadTaskCallback callback, String timezone, long timeoutS) { - super(callback); + super(callback, TaskType.LOADING); this.db = db; this.table = table; this.brokerDesc = brokerDesc; diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java index 8b42ab7c10c540..33e3490a8d6c03 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java @@ -45,6 +45,7 @@ import org.apache.doris.transaction.GlobalTransactionMgr; import org.apache.doris.transaction.TransactionState; import org.apache.doris.transaction.TransactionStatus; + import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -702,6 +703,8 @@ public void updateJobProgress(Long jobId, Long beId, TUniqueId loadId, TUniqueId } } + @Deprecated + // Deprecated in version 0.12 // This method is only for bug fix. And should be call after image and edit log are replayed. public void fixLoadJobMetaBugs(GlobalTransactionMgr txnMgr) { for (LoadJob job : idToLoadJob.values()) { @@ -738,11 +741,11 @@ public void fixLoadJobMetaBugs(GlobalTransactionMgr txnMgr) { * replay process. This results in that when the FE restarts, these load jobs * that should have been completed are re-entered into the pending state, * resulting in repeated submission load tasks. - * + * * Those wrong images are unrecoverable, so that we have to cancel all load jobs * in PENDING or LOADING state when restarting FE, to avoid submit jobs * repeatedly. - * + * * This code can be remove when upgrading from 0.11.x to future version. */ if (job.getState() == JobState.LOADING || job.getState() == JobState.PENDING) { @@ -773,7 +776,7 @@ public void fixLoadJobMetaBugs(GlobalTransactionMgr txnMgr) { // it would be failed if FE restart. job.cancelJobWithoutCheck(new FailMsg(CancelType.LOAD_RUN_FAIL, "fe restart"), false, false); LOG.info("transfer mini load job {} state from {} to CANCELLED, because transaction status is unknown" - + ". label: {}, db: {}", + + ". label: {}, db: {}", job.getId(), prevState, job.getLabel(), job.getDbId()); } else { // txn is not found. here are 2 cases: @@ -784,7 +787,7 @@ public void fixLoadJobMetaBugs(GlobalTransactionMgr txnMgr) { // removed by expiration). // Without affecting the first case of job, we set the job finish time to be the same as the create time. // In this way, the second type of job will be automatically cleared after running removeOldLoadJob(); - + // use CancelType.UNKNOWN, so that we can set finish time to be the same as the create time job.cancelJobWithoutCheck(new FailMsg(CancelType.TXN_UNKNOWN, "transaction status is unknown"), false, false); LOG.info("finish load job {} from {} to CANCELLED, because transaction status is unknown. label: {}, db: {}, create: {}", diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadTask.java index 9a4568a1bc10d5..6ee0cfe4e175af 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadTask.java @@ -36,14 +36,21 @@ public enum MergeType { DELETE } + public enum TaskType { + PENDING, + LOADING + } + private static final Logger LOG = LogManager.getLogger(LoadTask.class); + protected TaskType taskType; protected LoadTaskCallback callback; protected TaskAttachment attachment; protected FailMsg failMsg = new FailMsg(); protected int retryTime = 1; - public LoadTask(LoadTaskCallback callback) { + public LoadTask(LoadTaskCallback callback, TaskType taskType) { + this.taskType = taskType; this.signature = Catalog.getCurrentCatalog().getNextId(); this.callback = callback; } @@ -96,4 +103,8 @@ public void updateRetryInfo() { this.retryTime--; this.signature = Catalog.getCurrentCatalog().getNextId(); } + + public TaskType getTaskType() { + return taskType; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java index 9297295748a407..6ee5d87955f99b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java @@ -83,6 +83,7 @@ import org.apache.doris.transaction.TransactionState.LoadJobSourceType; import org.apache.doris.transaction.TransactionState.TxnCoordinator; import org.apache.doris.transaction.TransactionState.TxnSourceType; + import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.collect.Lists; @@ -196,7 +197,7 @@ protected void unprotectedExecuteJob() throws LoadException { sparkResource, brokerDesc); task.init(); idToTasks.put(task.getSignature(), task); - Catalog.getCurrentCatalog().getLoadTaskScheduler().submit(task); + Catalog.getCurrentCatalog().getPendingLoadTaskScheduler().submit(task); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java index dbf5247a435c02..7445e2b4e9e0ff 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java @@ -57,6 +57,7 @@ import org.apache.doris.load.loadv2.etl.EtlJobConfig.FilePatternVersion; import org.apache.doris.load.loadv2.etl.EtlJobConfig.SourceType; import org.apache.doris.transaction.TransactionState; + import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -88,7 +89,7 @@ public class SparkLoadPendingTask extends LoadTask { public SparkLoadPendingTask(SparkLoadJob loadTaskCallback, Map> aggKeyToBrokerFileGroups, SparkResource resource, BrokerDesc brokerDesc) { - super(loadTaskCallback); + super(loadTaskCallback, TaskType.PENDING); this.retryTime = 3; this.attachment = new SparkPendingTaskAttachment(signature); this.aggKeyToBrokerFileGroups = aggKeyToBrokerFileGroups; diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/MasterTaskExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/task/MasterTaskExecutor.java index 9b26556a3d10e9..932e6080b71b56 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/MasterTaskExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/MasterTaskExecutor.java @@ -17,9 +17,10 @@ package org.apache.doris.task; +import org.apache.doris.common.ThreadPoolManager; + import com.google.common.collect.Maps; -import org.apache.doris.common.ThreadPoolManager; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -45,6 +46,12 @@ public MasterTaskExecutor(String name, int threadNum, boolean needRegisterMetric scheduledThreadPool = ThreadPoolManager.newDaemonScheduledThreadPool(1, name + "_scheduler_thread_pool", needRegisterMetric); } + public MasterTaskExecutor(String name, int threadNum, int queueSize, boolean needRegisterMetric) { + executor = ThreadPoolManager.newDaemonFixedThreadPool(threadNum, queueSize, name + "_pool", needRegisterMetric); + runningTasks = Maps.newHashMap(); + scheduledThreadPool = ThreadPoolManager.newDaemonScheduledThreadPool(1, name + "_scheduler_thread_pool", needRegisterMetric); + } + public void start() { scheduledThreadPool.scheduleAtFixedRate(new TaskChecker(), 0L, 1000L, TimeUnit.MILLISECONDS); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java index 7f263d7a1615a0..c3c64d13edd2ab 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java @@ -61,6 +61,7 @@ import org.apache.doris.transaction.TabletCommitInfo; import org.apache.doris.transaction.TransactionState; import org.apache.doris.transaction.TransactionState.LoadJobSourceType; + import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -209,7 +210,7 @@ public void testExecute(@Mocked Catalog catalog, @Mocked SparkLoadPendingTask pe pendingTask.init(); pendingTask.getSignature(); result = pendingTaskId; - catalog.getLoadTaskScheduler(); + catalog.getPendingLoadTaskScheduler(); result = executor; executor.submit((SparkLoadPendingTask) any); result = true;