Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 15 additions & 8 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,9 @@ public class Catalog {

private TabletChecker tabletChecker;

private MasterTaskExecutor loadTaskScheduler;
// Thread pools for pending and loading task, separately
private MasterTaskExecutor pendingLoadTaskScheduler;
private MasterTaskExecutor loadingLoadTaskScheduler;

private LoadJobScheduler loadJobScheduler;

Expand Down Expand Up @@ -537,7 +539,10 @@ private Catalog(boolean isCheckpointCatalog) {
this.tabletScheduler = new TabletScheduler(this, systemInfo, tabletInvertedIndex, stat);
this.tabletChecker = new TabletChecker(this, systemInfo, tabletScheduler, stat);

this.loadTaskScheduler = new MasterTaskExecutor("load_task_scheduler", Config.async_load_task_pool_size, !isCheckpointCatalog);
this.pendingLoadTaskScheduler = new MasterTaskExecutor("pending_load_task_scheduler", Config.async_load_task_pool_size,
Config.desired_max_waiting_jobs, !isCheckpointCatalog);
this.loadingLoadTaskScheduler = new MasterTaskExecutor("loading_load_task_scheduler", Config.async_load_task_pool_size,
Config.desired_max_waiting_jobs, !isCheckpointCatalog);
this.loadJobScheduler = new LoadJobScheduler();
this.loadManager = new LoadManager(loadJobScheduler);
this.loadTimeoutChecker = new LoadTimeoutChecker(loadManager);
Expand Down Expand Up @@ -1238,9 +1243,6 @@ public void fixBugAfterMetadataReplayed(boolean waitCatalogReady) {
}
}
}

LOG.info("start to fix meta data bug");
loadManager.fixLoadJobMetaBugs(globalTransactionMgr);
}

// start all daemon threads only running on Master
Expand All @@ -1262,7 +1264,8 @@ private void startMasterOnlyDaemonThreads() {
LoadChecker.init(Config.load_checker_interval_second * 1000L);
LoadChecker.startAll();
// New load scheduler
loadTaskScheduler.start();
pendingLoadTaskScheduler.start();
loadingLoadTaskScheduler.start();
loadManager.prepareJobs();
loadJobScheduler.start();
loadTimeoutChecker.start();
Expand Down Expand Up @@ -4822,8 +4825,12 @@ public LoadManager getLoadManager() {
return loadManager;
}

public MasterTaskExecutor getLoadTaskScheduler() {
return loadTaskScheduler;
public MasterTaskExecutor getPendingLoadTaskScheduler() {
return pendingLoadTaskScheduler;
}

public MasterTaskExecutor getLoadingLoadTaskScheduler() {
return loadingLoadTaskScheduler;
}

public RoutineLoadManager getRoutineLoadManager() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.BeginTransactionException;
import org.apache.doris.transaction.TransactionState;
import org.apache.doris.transaction.TransactionState.TxnSourceType;
import org.apache.doris.transaction.TransactionState.TxnCoordinator;
import org.apache.doris.transaction.TransactionState.TxnSourceType;

import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -93,7 +93,7 @@ public void beginTxn()
protected void unprotectedExecuteJob() {
LoadTask task = new BrokerLoadPendingTask(this, fileGroupAggInfo.getAggKeyToFileGroups(), brokerDesc);
idToTasks.put(task.getSignature(), task);
Catalog.getCurrentCatalog().getLoadTaskScheduler().submit(task);
Catalog.getCurrentCatalog().getPendingLoadTaskScheduler().submit(task);
}

/**
Expand Down Expand Up @@ -170,9 +170,9 @@ private void onPendingTaskFinished(BrokerPendingTaskAttachment attachment) {

private void createLoadingTask(Database db, BrokerPendingTaskAttachment attachment) throws UserException {
// divide job into broker loading task by table
List<LoadLoadingTask> newLoadingTasks = Lists.newArrayList();
db.readLock();
try {
List<LoadLoadingTask> newLoadingTasks = Lists.newArrayList();
for (Map.Entry<FileGroupAggKey, List<BrokerFileGroup>> entry : fileGroupAggInfo.getAggKeyToFileGroups().entrySet()) {
FileGroupAggKey aggKey = entry.getKey();
List<BrokerFileGroup> brokerFileGroups = entry.getValue();
Expand Down Expand Up @@ -208,13 +208,15 @@ brokerFileGroups, getDeadlineMs(), execMemLimit,
}
txnState.addTableIndexes(table);
}
// submit all tasks together
for (LoadTask loadTask : newLoadingTasks) {
Catalog.getCurrentCatalog().getLoadTaskScheduler().submit(loadTask);
}

} finally {
db.readUnlock();
}

// Submit task outside the database lock, cause it may take a while if task queue is full.
for (LoadTask loadTask : newLoadingTasks) {
Catalog.getCurrentCatalog().getLoadingLoadTaskScheduler().submit(loadTask);
}
}

private void onLoadingTaskFinished(BrokerLoadingTaskAttachment attachment) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class BrokerLoadPendingTask extends LoadTask {
public BrokerLoadPendingTask(BrokerLoadJob loadTaskCallback,
Map<FileGroupAggKey, List<BrokerFileGroup>> aggKeyToBrokerFileGroups,
BrokerDesc brokerDesc) {
super(loadTaskCallback);
super(loadTaskCallback, TaskType.PENDING);
this.retryTime = 3;
this.attachment = new BrokerPendingTaskAttachment(signature);
this.aggKeyToBrokerFileGroups = aggKeyToBrokerFileGroups;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.doris.catalog.Table;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.LogBuilder;
Expand All @@ -43,14 +44,14 @@
import org.apache.doris.transaction.TabletCommitInfo;
import org.apache.doris.transaction.TransactionState;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
Expand Down Expand Up @@ -235,8 +236,15 @@ public void onTaskFailed(long taskId, FailMsg failMsg) {
idToTasks.put(loadTask.getSignature(), loadTask);
// load id will be added to loadStatistic when executing this task
try {
Catalog.getCurrentCatalog().getLoadTaskScheduler().submit(loadTask);
} catch (RejectedExecutionException e) {
if (loadTask.getTaskType() == LoadTask.TaskType.PENDING) {
Catalog.getCurrentCatalog().getPendingLoadTaskScheduler().submit(loadTask);
} else if (loadTask.getTaskType() == LoadTask.TaskType.LOADING) {
Catalog.getCurrentCatalog().getLoadingLoadTaskScheduler().submit(loadTask);
} else {
throw new LoadException(String.format("Unknown load task type: %s. task id: %d, job id, %d",
loadTask.getTaskType(), loadTask.getSignature(), id));
}
} catch (RejectedExecutionException | LoadException e) {
unprotectedExecuteCancel(failMsg, true);
logFinalOperation();
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public LoadLoadingTask(Database db, OlapTable table,
long jobDeadlineMs, long execMemLimit, boolean strictMode,
long txnId, LoadTaskCallback callback, String timezone,
long timeoutS) {
super(callback);
super(callback, TaskType.LOADING);
this.db = db;
this.table = table;
this.brokerDesc = brokerDesc;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.doris.transaction.GlobalTransactionMgr;
import org.apache.doris.transaction.TransactionState;
import org.apache.doris.transaction.TransactionStatus;

import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
Expand Down Expand Up @@ -702,6 +703,8 @@ public void updateJobProgress(Long jobId, Long beId, TUniqueId loadId, TUniqueId
}
}

@Deprecated
// Deprecated in version 0.12
// This method is only for bug fix. And should be call after image and edit log are replayed.
public void fixLoadJobMetaBugs(GlobalTransactionMgr txnMgr) {
for (LoadJob job : idToLoadJob.values()) {
Expand Down Expand Up @@ -738,11 +741,11 @@ public void fixLoadJobMetaBugs(GlobalTransactionMgr txnMgr) {
* replay process. This results in that when the FE restarts, these load jobs
* that should have been completed are re-entered into the pending state,
* resulting in repeated submission load tasks.
*
*
* Those wrong images are unrecoverable, so that we have to cancel all load jobs
* in PENDING or LOADING state when restarting FE, to avoid submit jobs
* repeatedly.
*
*
* This code can be remove when upgrading from 0.11.x to future version.
*/
if (job.getState() == JobState.LOADING || job.getState() == JobState.PENDING) {
Expand Down Expand Up @@ -773,7 +776,7 @@ public void fixLoadJobMetaBugs(GlobalTransactionMgr txnMgr) {
// it would be failed if FE restart.
job.cancelJobWithoutCheck(new FailMsg(CancelType.LOAD_RUN_FAIL, "fe restart"), false, false);
LOG.info("transfer mini load job {} state from {} to CANCELLED, because transaction status is unknown"
+ ". label: {}, db: {}",
+ ". label: {}, db: {}",
job.getId(), prevState, job.getLabel(), job.getDbId());
} else {
// txn is not found. here are 2 cases:
Expand All @@ -784,7 +787,7 @@ public void fixLoadJobMetaBugs(GlobalTransactionMgr txnMgr) {
// removed by expiration).
// Without affecting the first case of job, we set the job finish time to be the same as the create time.
// In this way, the second type of job will be automatically cleared after running removeOldLoadJob();

// use CancelType.UNKNOWN, so that we can set finish time to be the same as the create time
job.cancelJobWithoutCheck(new FailMsg(CancelType.TXN_UNKNOWN, "transaction status is unknown"), false, false);
LOG.info("finish load job {} from {} to CANCELLED, because transaction status is unknown. label: {}, db: {}, create: {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,21 @@ public enum MergeType {
DELETE
}

public enum TaskType {
PENDING,
LOADING
}

private static final Logger LOG = LogManager.getLogger(LoadTask.class);

protected TaskType taskType;
protected LoadTaskCallback callback;
protected TaskAttachment attachment;
protected FailMsg failMsg = new FailMsg();
protected int retryTime = 1;

public LoadTask(LoadTaskCallback callback) {
public LoadTask(LoadTaskCallback callback, TaskType taskType) {
this.taskType = taskType;
this.signature = Catalog.getCurrentCatalog().getNextId();
this.callback = callback;
}
Expand Down Expand Up @@ -96,4 +103,8 @@ public void updateRetryInfo() {
this.retryTime--;
this.signature = Catalog.getCurrentCatalog().getNextId();
}

public TaskType getTaskType() {
return taskType;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import org.apache.doris.transaction.TransactionState.LoadJobSourceType;
import org.apache.doris.transaction.TransactionState.TxnCoordinator;
import org.apache.doris.transaction.TransactionState.TxnSourceType;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -196,7 +197,7 @@ protected void unprotectedExecuteJob() throws LoadException {
sparkResource, brokerDesc);
task.init();
idToTasks.put(task.getSignature(), task);
Catalog.getCurrentCatalog().getLoadTaskScheduler().submit(task);
Catalog.getCurrentCatalog().getPendingLoadTaskScheduler().submit(task);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.doris.load.loadv2.etl.EtlJobConfig.FilePatternVersion;
import org.apache.doris.load.loadv2.etl.EtlJobConfig.SourceType;
import org.apache.doris.transaction.TransactionState;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
Expand Down Expand Up @@ -88,7 +89,7 @@ public class SparkLoadPendingTask extends LoadTask {
public SparkLoadPendingTask(SparkLoadJob loadTaskCallback,
Map<FileGroupAggKey, List<BrokerFileGroup>> aggKeyToBrokerFileGroups,
SparkResource resource, BrokerDesc brokerDesc) {
super(loadTaskCallback);
super(loadTaskCallback, TaskType.PENDING);
this.retryTime = 3;
this.attachment = new SparkPendingTaskAttachment(signature);
this.aggKeyToBrokerFileGroups = aggKeyToBrokerFileGroups;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@

package org.apache.doris.task;

import org.apache.doris.common.ThreadPoolManager;

import com.google.common.collect.Maps;

import org.apache.doris.common.ThreadPoolManager;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -45,6 +46,12 @@ public MasterTaskExecutor(String name, int threadNum, boolean needRegisterMetric
scheduledThreadPool = ThreadPoolManager.newDaemonScheduledThreadPool(1, name + "_scheduler_thread_pool", needRegisterMetric);
}

public MasterTaskExecutor(String name, int threadNum, int queueSize, boolean needRegisterMetric) {
executor = ThreadPoolManager.newDaemonFixedThreadPool(threadNum, queueSize, name + "_pool", needRegisterMetric);
runningTasks = Maps.newHashMap();
scheduledThreadPool = ThreadPoolManager.newDaemonScheduledThreadPool(1, name + "_scheduler_thread_pool", needRegisterMetric);
}

public void start() {
scheduledThreadPool.scheduleAtFixedRate(new TaskChecker(), 0L, 1000L, TimeUnit.MILLISECONDS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.doris.transaction.TabletCommitInfo;
import org.apache.doris.transaction.TransactionState;
import org.apache.doris.transaction.TransactionState.LoadJobSourceType;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;

Expand Down Expand Up @@ -209,7 +210,7 @@ public void testExecute(@Mocked Catalog catalog, @Mocked SparkLoadPendingTask pe
pendingTask.init();
pendingTask.getSignature();
result = pendingTaskId;
catalog.getLoadTaskScheduler();
catalog.getPendingLoadTaskScheduler();
result = executor;
executor.submit((SparkLoadPendingTask) any);
result = true;
Expand Down