Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,15 @@ public int calculateCurrentConcurrentTaskNum() throws MetaNotFoundException {
return Math.min(Math.min(partitionNum, Math.min(desireTaskConcurrentNum, aliveBeNum)), DEFAULT_TASK_MAX_CONCURRENT_NUM);
}

// partitionIdToOffset must be not empty when loaded rows > 0
// situation1: be commit txn but fe throw error when committing txn,
// fe rollback txn without partitionIdToOffset by itself
// this task should not be commit
// otherwise currentErrorNum and currentTotalNum is updated when progress is not updated
@Override
boolean checkCommitInfo(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment) {
if (rlTaskTxnCommitAttachment.getLoadedRows() > 0
&& ((KafkaProgress) rlTaskTxnCommitAttachment.getProgress()).getPartitionIdToOffset().size() == 0) {
&& ((KafkaProgress) rlTaskTxnCommitAttachment.getProgress()).getPartitionIdToOffset().isEmpty()) {
LOG.warn(new LogBuilder(LogKey.ROUINTE_LOAD_TASK, DebugUtil.printId(rlTaskTxnCommitAttachment.getTaskId()))
.add("job_id", id)
.add("loaded_rows", rlTaskTxnCommitAttachment.getLoadedRows())
Expand Down Expand Up @@ -223,7 +228,13 @@ protected boolean unprotectNeedReschedule() {
try {
newCurrentKafkaPartition = getAllKafkaPartitions();
} catch (Exception e) {
LOG.warn("Job {} failed to fetch all current partition", id);
LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
.add("error_msg", "Job failed to fetch all current partition with error " + e.getMessage())
.build(), e);
if (this.state == JobState.NEED_SCHEDULE) {
unprotectUpdateState(JobState.PAUSED,
"Job failed to fetch all current partition with error " + e.getMessage());
}
return false;
}
if (currentKafkaPartitions.containsAll(newCurrentKafkaPartition)) {
Expand Down
141 changes: 90 additions & 51 deletions fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ public boolean isFinalState() {
protected RoutineLoadProgress progress;
protected String pausedReason;
protected String cancelReason;
protected long endTimestamp;

// currentErrorNum and currentTotalNum will be update
// when currentTotalNum is more then ten thousand or currentErrorNum is more then maxErrorNum
Expand All @@ -160,6 +161,7 @@ public RoutineLoadJob(String name, long dbId, long tableId, LoadDataSourceType d
this.state = JobState.NEED_SCHEDULE;
this.dataSourceType = dataSourceType;
this.resourceInfo = ConnectContext.get().toResourceCtx();
this.endTimestamp = -1;
this.authCode = new StringBuilder().append(ConnectContext.get().getQualifiedUser())
.append(ConnectContext.get().getRemoteIP())
.append(id).append(System.currentTimeMillis()).toString().hashCode();
Expand All @@ -183,6 +185,7 @@ public RoutineLoadJob(long id, String name, long dbId, long tableId,
this.dataSourceType = dataSourceType;
this.maxErrorNum = maxErrorNum;
this.resourceInfo = ConnectContext.get().toResourceCtx();
this.endTimestamp = -1;
this.routineLoadTaskInfoList = new ArrayList<>();
lock = new ReentrantReadWriteLock(true);
}
Expand Down Expand Up @@ -254,6 +257,10 @@ public long getAuthCode() {
return authCode;
}

public long getEndTimestamp() {
return endTimestamp;
}

// this is a unprotected method which is called in the initialization function
protected void setRoutineLoadDesc(RoutineLoadDesc routineLoadDesc) throws LoadException {
if (this.routineLoadDesc != null) {
Expand Down Expand Up @@ -343,31 +350,36 @@ public TExecPlanFragmentParams gettExecPlanFragmentParams() {
// only check loading task
public List<RoutineLoadTaskInfo> processTimeoutTasks() {
List<RoutineLoadTaskInfo> result = new ArrayList<>();
List<RoutineLoadTaskInfo> timeoutTaskList = new ArrayList<>();
writeLock();
try {
List<RoutineLoadTaskInfo> runningTasks = new ArrayList<>(routineLoadTaskInfoList);
for (RoutineLoadTaskInfo routineLoadTaskInfo : runningTasks) {
if ((routineLoadTaskInfo.getLoadStartTimeMs() != 0L)
&& ((System.currentTimeMillis() - routineLoadTaskInfo.getLoadStartTimeMs())
> DEFAULT_TASK_TIMEOUT_SECONDS * 1000)) {
UUID oldTaskId = routineLoadTaskInfo.getId();
// abort txn if not committed
try {
Catalog.getCurrentGlobalTransactionMgr()
.abortTransaction(routineLoadTaskInfo.getTxnId(), "routine load task of txn was timeout");
} catch (UserException e) {
if (e.getMessage().contains("committed")) {
LOG.debug(new LogBuilder(LogKey.ROUINTE_LOAD_TASK, oldTaskId)
.add("msg", "txn of task has been committed when checking timeout")
.build());
continue;
}
}
timeoutTaskList.add(routineLoadTaskInfo);
}
}
} finally {
writeUnlock();
}

for (RoutineLoadTaskInfo routineLoadTaskInfo : timeoutTaskList) {
UUID oldTaskId = routineLoadTaskInfo.getId();
// abort txn if not committed
try {
Catalog.getCurrentGlobalTransactionMgr()
.abortTransaction(routineLoadTaskInfo.getTxnId(), "routine load task of txn was timeout");
} catch (UserException e) {
if (e.getMessage().contains("committed")) {
LOG.debug(new LogBuilder(LogKey.ROUINTE_LOAD_TASK, oldTaskId)
.add("msg", "txn of task has been committed when checking timeout")
.build(), e);
continue;
}
}
}
return result;
}

Expand Down Expand Up @@ -510,6 +522,10 @@ public void plan() throws UserException {
@Override
public void beforeAborted(TransactionState txnState, String txnStatusChangeReason)
throws AbortTransactionException {
LOG.debug(new LogBuilder(LogKey.ROUINTE_LOAD_TASK, txnState.getLabel())
.add("txn_state", txnState)
.add("msg", "task before aborted")
.build());
readLock();
try {
String taskId = txnState.getLabel();
Expand All @@ -526,6 +542,10 @@ public void beforeAborted(TransactionState txnState, String txnStatusChangeReaso

@Override
public void beforeCommitted(TransactionState txnState) throws TransactionException {
LOG.debug(new LogBuilder(LogKey.ROUINTE_LOAD_TASK, txnState.getLabel())
.add("txn_state", txnState)
.add("msg", "task before committed")
.build());
readLock();
try {
// check if task has been aborted
Expand Down Expand Up @@ -578,25 +598,38 @@ public void onCommitted(TransactionState txnState) throws TransactionException {
// txn will be aborted but progress will be update
// progress will be update otherwise the progress will be hung
@Override
public void onAborted(TransactionState txnState, String txnStatusChangeReason) {
if (txnStatusChangeReason != null) {
LOG.debug(new LogBuilder(LogKey.ROUINTE_LOAD_TASK, txnState.getLabel())
.add("txn_id", txnState.getTransactionId())
.add("msg", "txn abort with reason " + txnStatusChangeReason)
.build());
} else {
LOG.debug(new LogBuilder(LogKey.ROUINTE_LOAD_TASK, txnState.getLabel())
.add("txn_id", txnState.getTransactionId())
.add("msg", "txn abort").build());
}
public void onAborted(TransactionState txnState, String txnStatusChangeReasonString) {
writeLock();
try {
// step0: find task in job
Optional<RoutineLoadTaskInfo> routineLoadTaskInfoOptional =
routineLoadTaskInfoList.parallelStream()
.filter(entity -> DebugUtil.printId(entity.getId()).equals(txnState.getLabel())).findFirst();
if (routineLoadTaskInfoOptional.isPresent()) {
// todo(ml): use previous be id depend on change reason
// step1: job state will be changed depending on txnStatusChangeReasonString
if (txnStatusChangeReasonString != null) {
LOG.debug(new LogBuilder(LogKey.ROUINTE_LOAD_TASK, txnState.getLabel())
.add("txn_id", txnState.getTransactionId())
.add("msg", "txn abort with reason " + txnStatusChangeReasonString)
.build());
TransactionState.TxnStatusChangeReason txnStatusChangeReason =
TransactionState.TxnStatusChangeReason.fromString(txnStatusChangeReasonString);
if (txnStatusChangeReason != null) {
switch (txnStatusChangeReason) {
case OFFSET_OUT_OF_RANGE:
updateState(JobState.CANCELLED, txnStatusChangeReason.toString());
return;
default:
break;
}
}
// todo(ml): use previous be id depend on change reason
} else {
LOG.debug(new LogBuilder(LogKey.ROUINTE_LOAD_TASK, txnState.getLabel())
.add("txn_id", txnState.getTransactionId())
.add("msg", "txn abort").build());
}
// step2: commit task , update progress, maybe create a new task
executeCommitTask(routineLoadTaskInfoOptional.get(), txnState);
} else {
LOG.debug(new LogBuilder(LogKey.ROUINTE_LOAD_TASK, txnState.getLabel())
Expand Down Expand Up @@ -686,37 +719,41 @@ public void updateState(JobState jobState) {
public void updateState(JobState jobState, String reason) {
writeLock();
try {
LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
.add("current_job_state", getState())
.add("desire_job_state", jobState)
.add("msg", "job will be change to desire state")
.build());
checkStateTransform(jobState);
switch (jobState) {
case PAUSED:
executePause(reason);
break;
case NEED_SCHEDULE:
executeNeedSchedule();
break;
case STOPPED:
executeStop();
break;
case CANCELLED:
executeCancel(reason);
break;
default:
break;
}
LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
.add("current_job_state", getState())
.add("msg", "job state has been changed")
.build());
unprotectUpdateState(jobState, reason);
} finally {
writeUnlock();
}
}

protected void unprotectUpdateState(JobState jobState, String reason) {
LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
.add("current_job_state", getState())
.add("desire_job_state", jobState)
.add("msg", "job will be change to desire state")
.build());
checkStateTransform(jobState);
switch (jobState) {
case PAUSED:
executePause(reason);
break;
case NEED_SCHEDULE:
executeNeedSchedule();
break;
case STOPPED:
executeStop();
break;
case CANCELLED:
executeCancel(reason);
break;
default:
break;
}
LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
.add("current_job_state", getState())
.add("msg", "job state has been changed")
.build());
}

private void executePause(String reason) {
// TODO(ml): edit log
// remove all of task in jobs and change job state to paused
Expand All @@ -735,12 +772,14 @@ private void executeStop() {
// TODO(ml): edit log
state = JobState.STOPPED;
routineLoadTaskInfoList.clear();
endTimestamp = System.currentTimeMillis();
}

private void executeCancel(String reason) {
cancelReason = reason;
state = JobState.CANCELLED;
routineLoadTaskInfoList.clear();
endTimestamp = System.currentTimeMillis();
}

public void update() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,22 @@
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -83,7 +87,7 @@ private void writeUnlock() {
public RoutineLoadManager() {
idToRoutineLoadJob = Maps.newConcurrentMap();
dbToNameToRoutineLoadJob = Maps.newConcurrentMap();
beIdToMaxConcurrentTasks = Maps.newHashMap();
beIdToMaxConcurrentTasks = Maps.newConcurrentMap();
lock = new ReentrantReadWriteLock(true);
}

Expand Down Expand Up @@ -448,8 +452,29 @@ public void processTimeoutTasks() {
// Remove old routine load jobs from idToRoutineLoadJob
// This function is called periodically.
// Cancelled and stopped job will be remove after Configure.label_keep_max_second seconds
public void removeOldRoutineLoadJobs() {
// TODO(ml): remove old routine load job
public void cleanOldRoutineLoadJobs() {
writeLock();
try {
Iterator<Map.Entry<Long, RoutineLoadJob>> iterator = idToRoutineLoadJob.entrySet().iterator();
long currentTimestamp = System.currentTimeMillis();
while (iterator.hasNext()) {
RoutineLoadJob routineLoadJob = iterator.next().getValue();
long jobEndTimestamp = routineLoadJob.getEndTimestamp();
if (jobEndTimestamp != -1L &&
((currentTimestamp - jobEndTimestamp) > Config.label_clean_interval_second * 1000)) {
dbToNameToRoutineLoadJob.get(routineLoadJob.getDbId()).get(routineLoadJob.getName()).remove(routineLoadJob);
iterator.remove();
LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId())
.add("end_timestamp", routineLoadJob.getEndTimestamp())
.add("current_timestamp", currentTimestamp)
.add("job_state", routineLoadJob.getState())
.add("msg", "old job has been cleaned")
);
}
}
} finally {
writeUnlock();
}
}

public void updateRoutineLoadJob() {
Expand Down
Loading