From c1d48e7fbdb64a731a680d87c5e40406feaaf9f6 Mon Sep 17 00:00:00 2001 From: morningman Date: Mon, 9 Nov 2020 13:47:13 +0800 Subject: [PATCH 1/3] [Bug] Fix some bugs of load job scheduler 1. The fix load meta bug logic should be removed since 0.12. 2. The load task thread pool's waiting queue should be as long as desired pending jobs num. 3. Submit the load task outside database lock to prevent holding lock for long time. --- .../java/org/apache/doris/catalog/Catalog.java | 6 ++---- .../apache/doris/load/loadv2/BrokerLoadJob.java | 14 ++++++++------ .../org/apache/doris/load/loadv2/LoadManager.java | 11 +++++++---- .../org/apache/doris/task/MasterTaskExecutor.java | 9 ++++++++- 4 files changed, 25 insertions(+), 15 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java index 2b73e27ccdd2e7..02f3c53280697e 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java @@ -537,7 +537,8 @@ private Catalog(boolean isCheckpointCatalog) { this.tabletScheduler = new TabletScheduler(this, systemInfo, tabletInvertedIndex, stat); this.tabletChecker = new TabletChecker(this, systemInfo, tabletScheduler, stat); - this.loadTaskScheduler = new MasterTaskExecutor("load_task_scheduler", Config.async_load_task_pool_size, !isCheckpointCatalog); + this.loadTaskScheduler = new MasterTaskExecutor("load_task_scheduler", Config.async_load_task_pool_size, + Config.desired_max_waiting_jobs, !isCheckpointCatalog); this.loadJobScheduler = new LoadJobScheduler(); this.loadManager = new LoadManager(loadJobScheduler); this.loadTimeoutChecker = new LoadTimeoutChecker(loadManager); @@ -1238,9 +1239,6 @@ public void fixBugAfterMetadataReplayed(boolean waitCatalogReady) { } } } - - LOG.info("start to fix meta data bug"); - loadManager.fixLoadJobMetaBugs(globalTransactionMgr); } // start all daemon threads only running on Master diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java index a253dbe919c8ed..129fb4a440490d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java @@ -40,8 +40,8 @@ import org.apache.doris.thrift.TUniqueId; import org.apache.doris.transaction.BeginTransactionException; import org.apache.doris.transaction.TransactionState; -import org.apache.doris.transaction.TransactionState.TxnSourceType; import org.apache.doris.transaction.TransactionState.TxnCoordinator; +import org.apache.doris.transaction.TransactionState.TxnSourceType; import com.google.common.base.Joiner; import com.google.common.collect.Lists; @@ -170,9 +170,9 @@ private void onPendingTaskFinished(BrokerPendingTaskAttachment attachment) { private void createLoadingTask(Database db, BrokerPendingTaskAttachment attachment) throws UserException { // divide job into broker loading task by table + List newLoadingTasks = Lists.newArrayList(); db.readLock(); try { - List newLoadingTasks = Lists.newArrayList(); for (Map.Entry> entry : fileGroupAggInfo.getAggKeyToFileGroups().entrySet()) { FileGroupAggKey aggKey = entry.getKey(); List brokerFileGroups = entry.getValue(); @@ -208,13 +208,15 @@ brokerFileGroups, getDeadlineMs(), execMemLimit, } txnState.addTableIndexes(table); } - // submit all tasks together - for (LoadTask loadTask : newLoadingTasks) { - Catalog.getCurrentCatalog().getLoadTaskScheduler().submit(loadTask); - } + } finally { db.readUnlock(); } + + // Submit task outside the database lock, cause it may take a while if task queue is full. + for (LoadTask loadTask : newLoadingTasks) { + Catalog.getCurrentCatalog().getLoadTaskScheduler().submit(loadTask); + } } private void onLoadingTaskFinished(BrokerLoadingTaskAttachment attachment) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java index 8b42ab7c10c540..33e3490a8d6c03 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java @@ -45,6 +45,7 @@ import org.apache.doris.transaction.GlobalTransactionMgr; import org.apache.doris.transaction.TransactionState; import org.apache.doris.transaction.TransactionStatus; + import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -702,6 +703,8 @@ public void updateJobProgress(Long jobId, Long beId, TUniqueId loadId, TUniqueId } } + @Deprecated + // Deprecated in version 0.12 // This method is only for bug fix. And should be call after image and edit log are replayed. public void fixLoadJobMetaBugs(GlobalTransactionMgr txnMgr) { for (LoadJob job : idToLoadJob.values()) { @@ -738,11 +741,11 @@ public void fixLoadJobMetaBugs(GlobalTransactionMgr txnMgr) { * replay process. This results in that when the FE restarts, these load jobs * that should have been completed are re-entered into the pending state, * resulting in repeated submission load tasks. - * + * * Those wrong images are unrecoverable, so that we have to cancel all load jobs * in PENDING or LOADING state when restarting FE, to avoid submit jobs * repeatedly. - * + * * This code can be remove when upgrading from 0.11.x to future version. */ if (job.getState() == JobState.LOADING || job.getState() == JobState.PENDING) { @@ -773,7 +776,7 @@ public void fixLoadJobMetaBugs(GlobalTransactionMgr txnMgr) { // it would be failed if FE restart. job.cancelJobWithoutCheck(new FailMsg(CancelType.LOAD_RUN_FAIL, "fe restart"), false, false); LOG.info("transfer mini load job {} state from {} to CANCELLED, because transaction status is unknown" - + ". label: {}, db: {}", + + ". label: {}, db: {}", job.getId(), prevState, job.getLabel(), job.getDbId()); } else { // txn is not found. here are 2 cases: @@ -784,7 +787,7 @@ public void fixLoadJobMetaBugs(GlobalTransactionMgr txnMgr) { // removed by expiration). // Without affecting the first case of job, we set the job finish time to be the same as the create time. // In this way, the second type of job will be automatically cleared after running removeOldLoadJob(); - + // use CancelType.UNKNOWN, so that we can set finish time to be the same as the create time job.cancelJobWithoutCheck(new FailMsg(CancelType.TXN_UNKNOWN, "transaction status is unknown"), false, false); LOG.info("finish load job {} from {} to CANCELLED, because transaction status is unknown. label: {}, db: {}, create: {}", diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/MasterTaskExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/task/MasterTaskExecutor.java index 9b26556a3d10e9..932e6080b71b56 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/MasterTaskExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/MasterTaskExecutor.java @@ -17,9 +17,10 @@ package org.apache.doris.task; +import org.apache.doris.common.ThreadPoolManager; + import com.google.common.collect.Maps; -import org.apache.doris.common.ThreadPoolManager; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -45,6 +46,12 @@ public MasterTaskExecutor(String name, int threadNum, boolean needRegisterMetric scheduledThreadPool = ThreadPoolManager.newDaemonScheduledThreadPool(1, name + "_scheduler_thread_pool", needRegisterMetric); } + public MasterTaskExecutor(String name, int threadNum, int queueSize, boolean needRegisterMetric) { + executor = ThreadPoolManager.newDaemonFixedThreadPool(threadNum, queueSize, name + "_pool", needRegisterMetric); + runningTasks = Maps.newHashMap(); + scheduledThreadPool = ThreadPoolManager.newDaemonScheduledThreadPool(1, name + "_scheduler_thread_pool", needRegisterMetric); + } + public void start() { scheduledThreadPool.scheduleAtFixedRate(new TaskChecker(), 0L, 1000L, TimeUnit.MILLISECONDS); } From 593a239971d836533203413ab0a6f37b39c89efa Mon Sep 17 00:00:00 2001 From: morningman Date: Mon, 9 Nov 2020 14:28:07 +0800 Subject: [PATCH 2/3] 2 --- .../org/apache/doris/catalog/Catalog.java | 19 ++++++++++++++----- .../doris/load/loadv2/BrokerLoadJob.java | 4 ++-- .../load/loadv2/BrokerLoadPendingTask.java | 8 +++++++- .../apache/doris/load/loadv2/BulkLoadJob.java | 18 +++++++++++++----- .../doris/load/loadv2/LoadLoadingTask.java | 2 +- .../apache/doris/load/loadv2/LoadTask.java | 13 ++++++++++++- .../doris/load/loadv2/SparkLoadJob.java | 3 ++- .../load/loadv2/SparkLoadPendingTask.java | 3 ++- .../doris/load/loadv2/SparkLoadJobTest.java | 3 ++- 9 files changed, 55 insertions(+), 18 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java index 02f3c53280697e..10d184d14ecbba 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java @@ -387,7 +387,9 @@ public class Catalog { private TabletChecker tabletChecker; - private MasterTaskExecutor loadTaskScheduler; + // Thread pools for pending and loading task, separately + private MasterTaskExecutor pendingLoadTaskScheduler; + private MasterTaskExecutor loadingLoadTaskScheduler; private LoadJobScheduler loadJobScheduler; @@ -537,7 +539,9 @@ private Catalog(boolean isCheckpointCatalog) { this.tabletScheduler = new TabletScheduler(this, systemInfo, tabletInvertedIndex, stat); this.tabletChecker = new TabletChecker(this, systemInfo, tabletScheduler, stat); - this.loadTaskScheduler = new MasterTaskExecutor("load_task_scheduler", Config.async_load_task_pool_size, + this.pendingLoadTaskScheduler = new MasterTaskExecutor("pending_load_task_scheduler", Config.async_load_task_pool_size, + Config.desired_max_waiting_jobs, !isCheckpointCatalog); + this.loadingLoadTaskScheduler = new MasterTaskExecutor("loading_load_task_scheduler", Config.async_load_task_pool_size, Config.desired_max_waiting_jobs, !isCheckpointCatalog); this.loadJobScheduler = new LoadJobScheduler(); this.loadManager = new LoadManager(loadJobScheduler); @@ -1260,7 +1264,8 @@ private void startMasterOnlyDaemonThreads() { LoadChecker.init(Config.load_checker_interval_second * 1000L); LoadChecker.startAll(); // New load scheduler - loadTaskScheduler.start(); + pendingLoadTaskScheduler.start(); + loadingLoadTaskScheduler.start(); loadManager.prepareJobs(); loadJobScheduler.start(); loadTimeoutChecker.start(); @@ -4820,8 +4825,12 @@ public LoadManager getLoadManager() { return loadManager; } - public MasterTaskExecutor getLoadTaskScheduler() { - return loadTaskScheduler; + public MasterTaskExecutor getPendingLoadTaskScheduler() { + return pendingLoadTaskScheduler; + } + + public MasterTaskExecutor getLoadingLoadTaskScheduler() { + return loadingLoadTaskScheduler; } public RoutineLoadManager getRoutineLoadManager() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java index 129fb4a440490d..719a4feb056e68 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java @@ -93,7 +93,7 @@ public void beginTxn() protected void unprotectedExecuteJob() { LoadTask task = new BrokerLoadPendingTask(this, fileGroupAggInfo.getAggKeyToFileGroups(), brokerDesc); idToTasks.put(task.getSignature(), task); - Catalog.getCurrentCatalog().getLoadTaskScheduler().submit(task); + Catalog.getCurrentCatalog().getPendingLoadTaskScheduler().submit(task); } /** @@ -215,7 +215,7 @@ brokerFileGroups, getDeadlineMs(), execMemLimit, // Submit task outside the database lock, cause it may take a while if task queue is full. for (LoadTask loadTask : newLoadingTasks) { - Catalog.getCurrentCatalog().getLoadTaskScheduler().submit(loadTask); + Catalog.getCurrentCatalog().getLoadingLoadTaskScheduler().submit(loadTask); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java index a4cff888bf27e9..7cfa842ba2e1d2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java @@ -45,7 +45,7 @@ public class BrokerLoadPendingTask extends LoadTask { public BrokerLoadPendingTask(BrokerLoadJob loadTaskCallback, Map> aggKeyToBrokerFileGroups, BrokerDesc brokerDesc) { - super(loadTaskCallback); + super(loadTaskCallback, TaskType.PENDING); this.retryTime = 3; this.attachment = new BrokerPendingTaskAttachment(signature); this.aggKeyToBrokerFileGroups = aggKeyToBrokerFileGroups; @@ -57,6 +57,12 @@ public BrokerLoadPendingTask(BrokerLoadJob loadTaskCallback, void executeTask() throws UserException { LOG.info("begin to execute broker pending task. job: {}", callback.getCallbackId()); getAllFileStatus(); + + try { + Thread.sleep(60000); + } catch (InterruptedException e) { + e.printStackTrace(); + } } private void getAllFileStatus() throws UserException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java index 1d649c65627bc0..c3a6ba5d6ff37a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java @@ -28,6 +28,7 @@ import org.apache.doris.catalog.Table; import org.apache.doris.common.DdlException; import org.apache.doris.common.FeMetaVersion; +import org.apache.doris.common.LoadException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.io.Text; import org.apache.doris.common.util.LogBuilder; @@ -43,14 +44,14 @@ import org.apache.doris.transaction.TabletCommitInfo; import org.apache.doris.transaction.TransactionState; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; @@ -235,8 +236,15 @@ public void onTaskFailed(long taskId, FailMsg failMsg) { idToTasks.put(loadTask.getSignature(), loadTask); // load id will be added to loadStatistic when executing this task try { - Catalog.getCurrentCatalog().getLoadTaskScheduler().submit(loadTask); - } catch (RejectedExecutionException e) { + if (loadTask.getTaskType() == LoadTask.TaskType.PENDING) { + Catalog.getCurrentCatalog().getPendingLoadTaskScheduler().submit(loadTask); + } else if (loadTask.getTaskType() == LoadTask.TaskType.LOADING) { + Catalog.getCurrentCatalog().getLoadingLoadTaskScheduler().submit(loadTask); + } else { + throw new LoadException(String.format("Unknown load task type: %s. task id: %d, job id, %d", + loadTask.getTaskType(), loadTask.getSignature(), id)); + } + } catch (RejectedExecutionException | LoadException e) { unprotectedExecuteCancel(failMsg, true); logFinalOperation(); return; diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java index caa6081e7073e7..85c939f492a319 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java @@ -68,7 +68,7 @@ public LoadLoadingTask(Database db, OlapTable table, long jobDeadlineMs, long execMemLimit, boolean strictMode, long txnId, LoadTaskCallback callback, String timezone, long timeoutS) { - super(callback); + super(callback, TaskType.LOADING); this.db = db; this.table = table; this.brokerDesc = brokerDesc; diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadTask.java index 9a4568a1bc10d5..6ee0cfe4e175af 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadTask.java @@ -36,14 +36,21 @@ public enum MergeType { DELETE } + public enum TaskType { + PENDING, + LOADING + } + private static final Logger LOG = LogManager.getLogger(LoadTask.class); + protected TaskType taskType; protected LoadTaskCallback callback; protected TaskAttachment attachment; protected FailMsg failMsg = new FailMsg(); protected int retryTime = 1; - public LoadTask(LoadTaskCallback callback) { + public LoadTask(LoadTaskCallback callback, TaskType taskType) { + this.taskType = taskType; this.signature = Catalog.getCurrentCatalog().getNextId(); this.callback = callback; } @@ -96,4 +103,8 @@ public void updateRetryInfo() { this.retryTime--; this.signature = Catalog.getCurrentCatalog().getNextId(); } + + public TaskType getTaskType() { + return taskType; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java index 9297295748a407..6ee5d87955f99b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java @@ -83,6 +83,7 @@ import org.apache.doris.transaction.TransactionState.LoadJobSourceType; import org.apache.doris.transaction.TransactionState.TxnCoordinator; import org.apache.doris.transaction.TransactionState.TxnSourceType; + import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.collect.Lists; @@ -196,7 +197,7 @@ protected void unprotectedExecuteJob() throws LoadException { sparkResource, brokerDesc); task.init(); idToTasks.put(task.getSignature(), task); - Catalog.getCurrentCatalog().getLoadTaskScheduler().submit(task); + Catalog.getCurrentCatalog().getPendingLoadTaskScheduler().submit(task); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java index dbf5247a435c02..7445e2b4e9e0ff 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java @@ -57,6 +57,7 @@ import org.apache.doris.load.loadv2.etl.EtlJobConfig.FilePatternVersion; import org.apache.doris.load.loadv2.etl.EtlJobConfig.SourceType; import org.apache.doris.transaction.TransactionState; + import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -88,7 +89,7 @@ public class SparkLoadPendingTask extends LoadTask { public SparkLoadPendingTask(SparkLoadJob loadTaskCallback, Map> aggKeyToBrokerFileGroups, SparkResource resource, BrokerDesc brokerDesc) { - super(loadTaskCallback); + super(loadTaskCallback, TaskType.PENDING); this.retryTime = 3; this.attachment = new SparkPendingTaskAttachment(signature); this.aggKeyToBrokerFileGroups = aggKeyToBrokerFileGroups; diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java index 7f263d7a1615a0..c3c64d13edd2ab 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java @@ -61,6 +61,7 @@ import org.apache.doris.transaction.TabletCommitInfo; import org.apache.doris.transaction.TransactionState; import org.apache.doris.transaction.TransactionState.LoadJobSourceType; + import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -209,7 +210,7 @@ public void testExecute(@Mocked Catalog catalog, @Mocked SparkLoadPendingTask pe pendingTask.init(); pendingTask.getSignature(); result = pendingTaskId; - catalog.getLoadTaskScheduler(); + catalog.getPendingLoadTaskScheduler(); result = executor; executor.submit((SparkLoadPendingTask) any); result = true; From 3a136fdc4afedf4e18c3a78ed6d749763633aa52 Mon Sep 17 00:00:00 2001 From: morningman Date: Mon, 9 Nov 2020 15:36:14 +0800 Subject: [PATCH 3/3] 3 --- .../org/apache/doris/load/loadv2/BrokerLoadPendingTask.java | 6 ------ 1 file changed, 6 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java index 7cfa842ba2e1d2..f50c80c123b205 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java @@ -57,12 +57,6 @@ public BrokerLoadPendingTask(BrokerLoadJob loadTaskCallback, void executeTask() throws UserException { LOG.info("begin to execute broker pending task. job: {}", callback.getCallbackId()); getAllFileStatus(); - - try { - Thread.sleep(60000); - } catch (InterruptedException e) { - e.printStackTrace(); - } } private void getAllFileStatus() throws UserException {