diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/InternalErrorCode.java b/fe/fe-core/src/main/java/org/apache/doris/common/InternalErrorCode.java index 86afe8234bd4da..d1e2cff13ad507 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/InternalErrorCode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/InternalErrorCode.java @@ -36,6 +36,7 @@ public enum InternalErrorCode { CREATE_TASKS_ERR(103), TASKS_ABORT_ERR(104), CANNOT_RESUME_ERR(105), + TIMEOUT_TOO_MUCH(106), // for MoW table DELETE_BITMAP_LOCK_ERR(301); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 77e9e9b9eff52f..68f0e6345e1281 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -758,6 +758,18 @@ public void processTimeoutTasks() { // and after renew, the previous task is removed from routineLoadTaskInfoList, // so task can no longer be committed successfully. // the already committed task will not be handled here. + int timeoutBackOffCount = routineLoadTaskInfo.getTimeoutBackOffCount(); + if (timeoutBackOffCount > RoutineLoadTaskInfo.MAX_TIMEOUT_BACK_OFF_COUNT) { + try { + updateState(JobState.PAUSED, new ErrorReason(InternalErrorCode.TIMEOUT_TOO_MUCH, + "task " + routineLoadTaskInfo.getId() + " timeout too much"), false); + } catch (UserException e) { + LOG.warn("update job state to pause failed", e); + } + return; + } + routineLoadTaskInfo.setTimeoutBackOffCount(timeoutBackOffCount + 1); + routineLoadTaskInfo.setTimeoutMs((routineLoadTaskInfo.getTimeoutMs() << 1)); RoutineLoadTaskInfo newTask = unprotectRenewTask(routineLoadTaskInfo); Env.getCurrentEnv().getRoutineLoadTaskScheduler().addTaskInQueue(newTask); } @@ -1306,6 +1318,7 @@ private void executeTaskOnTxnStatusChanged(RoutineLoadTaskInfo routineLoadTaskIn } else if (checkCommitInfo(rlTaskTxnCommitAttachment, txnState, txnStatusChangeReason)) { // step2: update job progress updateProgress(rlTaskTxnCommitAttachment); + routineLoadTaskInfo.selfAdaptTimeout(rlTaskTxnCommitAttachment); } if (rlTaskTxnCommitAttachment != null && !Strings.isNullOrEmpty(rlTaskTxnCommitAttachment.getErrorLogUrl())) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java index 93ec573717c8d5..2457cf01bcc979 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java @@ -72,6 +72,9 @@ public abstract class RoutineLoadTaskInfo { protected boolean isMultiTable = false; + protected static final int MAX_TIMEOUT_BACK_OFF_COUNT = 3; + protected int timeoutBackOffCount = 0; + // this status will be set when corresponding transaction's status is changed. // so that user or other logic can know the status of the corresponding txn. protected TransactionStatus txnStatus = TransactionStatus.UNKNOWN; @@ -130,6 +133,10 @@ public void setLastScheduledTime(long lastScheduledTime) { this.lastScheduledTime = lastScheduledTime; } + public void setTimeoutMs(long timeoutMs) { + this.timeoutMs = timeoutMs; + } + public long getTimeoutMs() { return timeoutMs; } @@ -142,6 +149,14 @@ public TransactionStatus getTxnStatus() { return txnStatus; } + public void setTimeoutBackOffCount(int timeoutBackOffCount) { + this.timeoutBackOffCount = timeoutBackOffCount; + } + + public int getTimeoutBackOffCount() { + return timeoutBackOffCount; + } + public boolean isTimeout() { if (txnStatus == TransactionStatus.COMMITTED || txnStatus == TransactionStatus.VISIBLE) { // the corresponding txn is already finished, this task can not be treated as timeout. @@ -156,6 +171,21 @@ public boolean isTimeout() { return false; } + public void selfAdaptTimeout(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment) { + long taskExecutionTime = rlTaskTxnCommitAttachment.getTaskExecutionTimeMs(); + long timeoutMs = this.timeoutMs; + + while (this.timeoutBackOffCount > 0) { + timeoutMs = timeoutMs >> 1; + if (timeoutMs <= taskExecutionTime) { + this.timeoutMs = timeoutMs << 1; + return; + } + this.timeoutBackOffCount--; + } + this.timeoutMs = timeoutMs; + } + abstract TRoutineLoadTask createRoutineLoadTask() throws UserException; // begin the txn of this task