From fb67c1392629a3d6e308be0c264d09ed25f9f62c Mon Sep 17 00:00:00 2001 From: laihui <1353307710@qq.com> Date: Fri, 15 Mar 2024 16:10:22 +0800 Subject: [PATCH 1/2] self-adaption adjustment of timeout time --- .../doris/common/InternalErrorCode.java | 1 + .../load/routineload/RoutineLoadJob.java | 13 +++++++ .../load/routineload/RoutineLoadTaskInfo.java | 35 +++++++++++++++++++ .../doris/load/routineload/ScheduleRule.java | 4 ++- 4 files changed, 52 insertions(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/InternalErrorCode.java b/fe/fe-core/src/main/java/org/apache/doris/common/InternalErrorCode.java index 944f8e0486057e..10173e130e5a35 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/InternalErrorCode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/InternalErrorCode.java @@ -35,6 +35,7 @@ public enum InternalErrorCode { TOO_MANY_FAILURE_ROWS_ERR(102), CREATE_TASKS_ERR(103), TASKS_ABORT_ERR(104), + TOO_MANY_TIMEOUT(106), // for MoW table DELETE_BITMAP_LOCK_ERR(301); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 8760dc4b71cf1b..376d8be0764815 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -727,6 +727,18 @@ public void processTimeoutTasks() { // and after renew, the previous task is removed from routineLoadTaskInfoList, // so task can no longer be committed successfully. // the already committed task will not be handled here. + int timeoutCount = routineLoadTaskInfo.getTimeoutCount(); + if (timeoutCount > RoutineLoadTaskInfo.MAX_TIMEOUT_COUNT) { + try { + updateState(JobState.PAUSED, new ErrorReason(InternalErrorCode.TOO_MANY_TIMEOUT, + "task " + routineLoadTaskInfo.getId() + " timeout too many time"), false); + } catch (UserException e) { + LOG.warn("update job state to pause failed", e); + } + return; + } + routineLoadTaskInfo.setTimeoutCount(timeoutCount + 1); + routineLoadTaskInfo.setTimeoutMs((routineLoadTaskInfo.getTimeoutMs() << 1)); RoutineLoadTaskInfo newTask = unprotectRenewTask(routineLoadTaskInfo); Env.getCurrentEnv().getRoutineLoadTaskScheduler().addTaskInQueue(newTask); } @@ -1252,6 +1264,7 @@ private void executeTaskOnTxnStatusChanged(RoutineLoadTaskInfo routineLoadTaskIn } else if (checkCommitInfo(rlTaskTxnCommitAttachment, txnState, txnStatusChangeReason)) { // step2: update job progress updateProgress(rlTaskTxnCommitAttachment); + routineLoadTaskInfo.selfAdaptTimeout(rlTaskTxnCommitAttachment); } if (rlTaskTxnCommitAttachment != null && !Strings.isNullOrEmpty(rlTaskTxnCommitAttachment.getErrorLogUrl())) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java index 93ec573717c8d5..cc14ab2a418ba9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java @@ -72,6 +72,9 @@ public abstract class RoutineLoadTaskInfo { protected boolean isMultiTable = false; + protected static final int MAX_TIMEOUT_COUNT = 3; + protected int timeoutCount = 0; + // this status will be set when corresponding transaction's status is changed. // so that user or other logic can know the status of the corresponding txn. protected TransactionStatus txnStatus = TransactionStatus.UNKNOWN; @@ -130,6 +133,10 @@ public void setLastScheduledTime(long lastScheduledTime) { this.lastScheduledTime = lastScheduledTime; } + public void setTimeoutMs(long timeoutMs) { + this.timeoutMs = timeoutMs; + } + public long getTimeoutMs() { return timeoutMs; } @@ -142,6 +149,14 @@ public TransactionStatus getTxnStatus() { return txnStatus; } + public void setTimeoutCount(int timeoutCount) { + this.timeoutCount = timeoutCount; + } + + public int getTimeoutCount() { + return timeoutCount; + } + public boolean isTimeout() { if (txnStatus == TransactionStatus.COMMITTED || txnStatus == TransactionStatus.VISIBLE) { // the corresponding txn is already finished, this task can not be treated as timeout. @@ -156,6 +171,26 @@ public boolean isTimeout() { return false; } + public void selfAdaptTimeout(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment) { + int count = 0; + long leftInterval = this.timeoutMs >> 1; + long rightInterval = this.timeoutMs; + long taskExecutionTime = rlTaskTxnCommitAttachment.getTaskExecutionTimeMs(); + + while (count < timeoutCount) { + if (leftInterval <= taskExecutionTime && taskExecutionTime <= rightInterval) { + this.timeoutMs = rightInterval; + this.timeoutCount = 0; + return; + } + leftInterval = leftInterval >> 1; + rightInterval = rightInterval >> 1; + count++; + } + this.timeoutMs = rightInterval; + this.timeoutCount = 0; + } + abstract TRoutineLoadTask createRoutineLoadTask() throws UserException; // begin the txn of this task diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/ScheduleRule.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/ScheduleRule.java index e2eaf4d825daf1..b0881d804e3d0c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/ScheduleRule.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/ScheduleRule.java @@ -63,7 +63,9 @@ public static boolean isNeedAutoSchedule(RoutineLoadJob jobRoutine) { jobRoutine.id, jobRoutine.firstResumeTimestamp, jobRoutine.autoResumeCount, jobRoutine.pauseReason == null ? "null" : jobRoutine.pauseReason.getCode().name()); } - if (jobRoutine.pauseReason != null && jobRoutine.pauseReason.getCode() == InternalErrorCode.REPLICA_FEW_ERR) { + if (jobRoutine.pauseReason != null + && (jobRoutine.pauseReason.getCode() == InternalErrorCode.REPLICA_FEW_ERR + || jobRoutine.pauseReason.getCode() == InternalErrorCode.TOO_MANY_TIMEOUT)) { int dead = deadBeCount(); if (dead > Config.max_tolerable_backend_down_num) { if (LOG.isDebugEnabled()) { From 9157238f838cca5469fbf6d592662bd935b64107 Mon Sep 17 00:00:00 2001 From: laihui <1353307710@qq.com> Date: Mon, 18 Mar 2024 17:46:44 +0800 Subject: [PATCH 2/2] update --- .../doris/common/InternalErrorCode.java | 2 +- .../load/routineload/RoutineLoadJob.java | 10 +++--- .../load/routineload/RoutineLoadTaskInfo.java | 31 ++++++++----------- 3 files changed, 19 insertions(+), 24 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/InternalErrorCode.java b/fe/fe-core/src/main/java/org/apache/doris/common/InternalErrorCode.java index 885e8e887c72a8..d1e2cff13ad507 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/InternalErrorCode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/InternalErrorCode.java @@ -35,8 +35,8 @@ public enum InternalErrorCode { TOO_MANY_FAILURE_ROWS_ERR(102), CREATE_TASKS_ERR(103), TASKS_ABORT_ERR(104), - TOO_MANY_TIMEOUT(106), CANNOT_RESUME_ERR(105), + TIMEOUT_TOO_MUCH(106), // for MoW table DELETE_BITMAP_LOCK_ERR(301); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index e4354f2197843a..68f0e6345e1281 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -758,17 +758,17 @@ public void processTimeoutTasks() { // and after renew, the previous task is removed from routineLoadTaskInfoList, // so task can no longer be committed successfully. // the already committed task will not be handled here. - int timeoutCount = routineLoadTaskInfo.getTimeoutCount(); - if (timeoutCount > RoutineLoadTaskInfo.MAX_TIMEOUT_COUNT) { + int timeoutBackOffCount = routineLoadTaskInfo.getTimeoutBackOffCount(); + if (timeoutBackOffCount > RoutineLoadTaskInfo.MAX_TIMEOUT_BACK_OFF_COUNT) { try { - updateState(JobState.PAUSED, new ErrorReason(InternalErrorCode.TOO_MANY_TIMEOUT, - "task " + routineLoadTaskInfo.getId() + " timeout too many time"), false); + updateState(JobState.PAUSED, new ErrorReason(InternalErrorCode.TIMEOUT_TOO_MUCH, + "task " + routineLoadTaskInfo.getId() + " timeout too much"), false); } catch (UserException e) { LOG.warn("update job state to pause failed", e); } return; } - routineLoadTaskInfo.setTimeoutCount(timeoutCount + 1); + routineLoadTaskInfo.setTimeoutBackOffCount(timeoutBackOffCount + 1); routineLoadTaskInfo.setTimeoutMs((routineLoadTaskInfo.getTimeoutMs() << 1)); RoutineLoadTaskInfo newTask = unprotectRenewTask(routineLoadTaskInfo); Env.getCurrentEnv().getRoutineLoadTaskScheduler().addTaskInQueue(newTask); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java index cc14ab2a418ba9..2457cf01bcc979 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java @@ -72,8 +72,8 @@ public abstract class RoutineLoadTaskInfo { protected boolean isMultiTable = false; - protected static final int MAX_TIMEOUT_COUNT = 3; - protected int timeoutCount = 0; + protected static final int MAX_TIMEOUT_BACK_OFF_COUNT = 3; + protected int timeoutBackOffCount = 0; // this status will be set when corresponding transaction's status is changed. // so that user or other logic can know the status of the corresponding txn. @@ -149,12 +149,12 @@ public TransactionStatus getTxnStatus() { return txnStatus; } - public void setTimeoutCount(int timeoutCount) { - this.timeoutCount = timeoutCount; + public void setTimeoutBackOffCount(int timeoutBackOffCount) { + this.timeoutBackOffCount = timeoutBackOffCount; } - public int getTimeoutCount() { - return timeoutCount; + public int getTimeoutBackOffCount() { + return timeoutBackOffCount; } public boolean isTimeout() { @@ -172,23 +172,18 @@ public boolean isTimeout() { } public void selfAdaptTimeout(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment) { - int count = 0; - long leftInterval = this.timeoutMs >> 1; - long rightInterval = this.timeoutMs; long taskExecutionTime = rlTaskTxnCommitAttachment.getTaskExecutionTimeMs(); + long timeoutMs = this.timeoutMs; - while (count < timeoutCount) { - if (leftInterval <= taskExecutionTime && taskExecutionTime <= rightInterval) { - this.timeoutMs = rightInterval; - this.timeoutCount = 0; + while (this.timeoutBackOffCount > 0) { + timeoutMs = timeoutMs >> 1; + if (timeoutMs <= taskExecutionTime) { + this.timeoutMs = timeoutMs << 1; return; } - leftInterval = leftInterval >> 1; - rightInterval = rightInterval >> 1; - count++; + this.timeoutBackOffCount--; } - this.timeoutMs = rightInterval; - this.timeoutCount = 0; + this.timeoutMs = timeoutMs; } abstract TRoutineLoadTask createRoutineLoadTask() throws UserException;