From a2fb1796b8a5864e160114d67cfc4f1818b742cb Mon Sep 17 00:00:00 2001 From: HHoflittlefish777 <77738092+HHoflittlefish777@users.noreply.github.com> Date: Wed, 20 Mar 2024 16:34:03 +0800 Subject: [PATCH 1/2] [opt](routine-load) self-adaption backoff timeout (#32227) --- .../doris/common/InternalErrorCode.java | 3 +- .../load/routineload/RoutineLoadJob.java | 13 ++++++++ .../load/routineload/RoutineLoadTaskInfo.java | 30 +++++++++++++++++++ 3 files changed, 45 insertions(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/InternalErrorCode.java b/fe/fe-core/src/main/java/org/apache/doris/common/InternalErrorCode.java index b871fd198cbd71..214f74a38f475e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/InternalErrorCode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/InternalErrorCode.java @@ -35,7 +35,8 @@ public enum InternalErrorCode { TOO_MANY_FAILURE_ROWS_ERR(102), CREATE_TASKS_ERR(103), TASKS_ABORT_ERR(104), - CANNOT_RESUME_ERR(105); + CANNOT_RESUME_ERR(105), + TIMEOUT_TOO_MUCH(106); private long errCode; diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index bb271a08d6db01..233d05d28b0103 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -741,6 +741,18 @@ public void processTimeoutTasks() { // and after renew, the previous task is removed from routineLoadTaskInfoList, // so task can no longer be committed successfully. // the already committed task will not be handled here. + int timeoutBackOffCount = routineLoadTaskInfo.getTimeoutBackOffCount(); + if (timeoutBackOffCount > RoutineLoadTaskInfo.MAX_TIMEOUT_BACK_OFF_COUNT) { + try { + updateState(JobState.PAUSED, new ErrorReason(InternalErrorCode.TIMEOUT_TOO_MUCH, + "task " + routineLoadTaskInfo.getId() + " timeout too much"), false); + } catch (UserException e) { + LOG.warn("update job state to pause failed", e); + } + return; + } + routineLoadTaskInfo.setTimeoutBackOffCount(timeoutBackOffCount + 1); + routineLoadTaskInfo.setTimeoutMs((routineLoadTaskInfo.getTimeoutMs() << 1)); RoutineLoadTaskInfo newTask = unprotectRenewTask(routineLoadTaskInfo); Env.getCurrentEnv().getRoutineLoadTaskScheduler().addTaskInQueue(newTask); } @@ -1267,6 +1279,7 @@ private void executeTaskOnTxnStatusChanged(RoutineLoadTaskInfo routineLoadTaskIn } else if (checkCommitInfo(rlTaskTxnCommitAttachment, txnState, txnStatusChangeReason)) { // step2: update job progress updateProgress(rlTaskTxnCommitAttachment); + routineLoadTaskInfo.selfAdaptTimeout(rlTaskTxnCommitAttachment); } if (rlTaskTxnCommitAttachment != null && !Strings.isNullOrEmpty(rlTaskTxnCommitAttachment.getErrorLogUrl())) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java index 93ec573717c8d5..2457cf01bcc979 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java @@ -72,6 +72,9 @@ public abstract class RoutineLoadTaskInfo { protected boolean isMultiTable = false; + protected static final int MAX_TIMEOUT_BACK_OFF_COUNT = 3; + protected int timeoutBackOffCount = 0; + // this status will be set when corresponding transaction's status is changed. // so that user or other logic can know the status of the corresponding txn. protected TransactionStatus txnStatus = TransactionStatus.UNKNOWN; @@ -130,6 +133,10 @@ public void setLastScheduledTime(long lastScheduledTime) { this.lastScheduledTime = lastScheduledTime; } + public void setTimeoutMs(long timeoutMs) { + this.timeoutMs = timeoutMs; + } + public long getTimeoutMs() { return timeoutMs; } @@ -142,6 +149,14 @@ public TransactionStatus getTxnStatus() { return txnStatus; } + public void setTimeoutBackOffCount(int timeoutBackOffCount) { + this.timeoutBackOffCount = timeoutBackOffCount; + } + + public int getTimeoutBackOffCount() { + return timeoutBackOffCount; + } + public boolean isTimeout() { if (txnStatus == TransactionStatus.COMMITTED || txnStatus == TransactionStatus.VISIBLE) { // the corresponding txn is already finished, this task can not be treated as timeout. @@ -156,6 +171,21 @@ public boolean isTimeout() { return false; } + public void selfAdaptTimeout(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment) { + long taskExecutionTime = rlTaskTxnCommitAttachment.getTaskExecutionTimeMs(); + long timeoutMs = this.timeoutMs; + + while (this.timeoutBackOffCount > 0) { + timeoutMs = timeoutMs >> 1; + if (timeoutMs <= taskExecutionTime) { + this.timeoutMs = timeoutMs << 1; + return; + } + this.timeoutBackOffCount--; + } + this.timeoutMs = timeoutMs; + } + abstract TRoutineLoadTask createRoutineLoadTask() throws UserException; // begin the txn of this task From 4c50c44dd1faa5209d2eb198e1b29d28872ee5d1 Mon Sep 17 00:00:00 2001 From: HHoflittlefish777 <77738092+HHoflittlefish777@users.noreply.github.com> Date: Fri, 22 Mar 2024 16:27:33 +0800 Subject: [PATCH 2/2] [fix](routine-load) fix timeout backoff can not work (#32661) --- .../load/routineload/KafkaRoutineLoadJob.java | 2 +- .../doris/load/routineload/KafkaTaskInfo.java | 18 +++++++++++++++--- .../load/routineload/RoutineLoadTaskInfo.java | 14 ++++++++------ .../routineload/KafkaRoutineLoadJobTest.java | 2 +- .../RoutineLoadTaskSchedulerTest.java | 2 +- .../transaction/GlobalTransactionMgrTest.java | 4 ++-- 6 files changed, 28 insertions(+), 14 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java index 24929520ecff98..bdcfb9e4a2724a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java @@ -227,7 +227,7 @@ public void divideRoutineLoadJob(int currentConcurrentTaskNum) throws UserExcept ((KafkaProgress) progress).getOffsetByPartition(kafkaPartition)); } KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(UUID.randomUUID(), id, - maxBatchIntervalS * 2 * 1000, taskKafkaProgress, isMultiTable()); + maxBatchIntervalS * 2 * 1000, 0, taskKafkaProgress, isMultiTable()); routineLoadTaskInfoList.add(kafkaTaskInfo); result.add(kafkaTaskInfo); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java index d8b79d9bdce77f..86a084764ea694 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java @@ -48,14 +48,16 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo { private Map partitionIdToOffset; public KafkaTaskInfo(UUID id, long jobId, - long timeoutMs, Map partitionIdToOffset, boolean isMultiTable) { - super(id, jobId, timeoutMs, isMultiTable); + long timeoutMs, int timeoutBackOffCount, + Map partitionIdToOffset, boolean isMultiTable) { + super(id, jobId, timeoutMs, timeoutBackOffCount, isMultiTable); this.partitionIdToOffset = partitionIdToOffset; } public KafkaTaskInfo(KafkaTaskInfo kafkaTaskInfo, Map partitionIdToOffset, boolean isMultiTable) { super(UUID.randomUUID(), kafkaTaskInfo.getJobId(), - kafkaTaskInfo.getTimeoutMs(), kafkaTaskInfo.getBeId(), isMultiTable); + kafkaTaskInfo.getTimeoutMs(), kafkaTaskInfo.getTimeoutBackOffCount(), + kafkaTaskInfo.getBeId(), isMultiTable); this.partitionIdToOffset = partitionIdToOffset; } @@ -131,6 +133,11 @@ private TExecPlanFragmentParams rePlan(RoutineLoadJob routineLoadJob) throws Use TExecPlanFragmentParams tExecPlanFragmentParams = routineLoadJob.plan(loadId, txnId); TPlanFragment tPlanFragment = tExecPlanFragmentParams.getFragment(); tPlanFragment.getOutputSink().getOlapTableSink().setTxnId(txnId); + // it needs update timeout to make task timeout backoff work + long timeoutS = this.getTimeoutMs() / 1000; + tPlanFragment.getOutputSink().getOlapTableSink().setLoadChannelTimeoutS(timeoutS); + tExecPlanFragmentParams.getQueryOptions().setQueryTimeout((int) timeoutS); + tExecPlanFragmentParams.getQueryOptions().setExecutionTimeout((int) timeoutS); long wgId = routineLoadJob.getWorkloadId(); List tWgList = new ArrayList<>(); @@ -153,6 +160,11 @@ private TPipelineFragmentParams rePlanForPipeline(RoutineLoadJob routineLoadJob) TPipelineFragmentParams tExecPlanFragmentParams = routineLoadJob.planForPipeline(loadId, txnId); TPlanFragment tPlanFragment = tExecPlanFragmentParams.getFragment(); tPlanFragment.getOutputSink().getOlapTableSink().setTxnId(txnId); + // it needs update timeout to make task timeout backoff work + long timeoutS = this.getTimeoutMs() / 1000; + tPlanFragment.getOutputSink().getOlapTableSink().setLoadChannelTimeoutS(timeoutS); + tExecPlanFragmentParams.getQueryOptions().setQueryTimeout((int) timeoutS); + tExecPlanFragmentParams.getQueryOptions().setExecutionTimeout((int) timeoutS); long wgId = routineLoadJob.getWorkloadId(); List tWgList = new ArrayList<>(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java index 2457cf01bcc979..cdee942f40833e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java @@ -79,17 +79,19 @@ public abstract class RoutineLoadTaskInfo { // so that user or other logic can know the status of the corresponding txn. protected TransactionStatus txnStatus = TransactionStatus.UNKNOWN; - public RoutineLoadTaskInfo(UUID id, long jobId, long timeoutMs, boolean isMultiTable) { + public RoutineLoadTaskInfo(UUID id, long jobId, long timeoutMs, + int timeoutBackOffCount, boolean isMultiTable) { this.id = id; this.jobId = jobId; this.createTimeMs = System.currentTimeMillis(); this.timeoutMs = timeoutMs; + this.timeoutBackOffCount = timeoutBackOffCount; this.isMultiTable = isMultiTable; } - public RoutineLoadTaskInfo(UUID id, long jobId, long timeoutMs, long previousBeId, - boolean isMultiTable) { - this(id, jobId, timeoutMs, isMultiTable); + public RoutineLoadTaskInfo(UUID id, long jobId, long timeoutMs, int timeoutBackOffCount, + long previousBeId, boolean isMultiTable) { + this(id, jobId, timeoutMs, timeoutBackOffCount, isMultiTable); this.previousBeId = previousBeId; } @@ -164,8 +166,8 @@ public boolean isTimeout() { } if (isRunning() && System.currentTimeMillis() - executeStartTimeMs > timeoutMs) { - LOG.info("task {} is timeout. start: {}, timeout: {}", DebugUtil.printId(id), - executeStartTimeMs, timeoutMs); + LOG.info("task {} is timeout. start: {}, timeout: {}, timeoutBackOffCount: {}", DebugUtil.printId(id), + executeStartTimeMs, timeoutMs, timeoutBackOffCount); return true; } return false; diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java index 79786fe8be9653..ba3c9ee626a763 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java @@ -221,7 +221,7 @@ public void testProcessTimeOutTasks(@Injectable GlobalTransactionMgr globalTrans Map partitionIdsToOffset = Maps.newHashMap(); partitionIdsToOffset.put(100, 0L); KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(new UUID(1, 1), 1L, - maxBatchIntervalS * 2 * 1000, partitionIdsToOffset, false); + maxBatchIntervalS * 2 * 1000, 0, partitionIdsToOffset, false); kafkaTaskInfo.setExecuteStartTimeMs(System.currentTimeMillis() - maxBatchIntervalS * 2 * 1000 - 1); routineLoadTaskInfoList.add(kafkaTaskInfo); diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java index c1f5731329fbad..c5bf509464e112 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java @@ -68,7 +68,7 @@ public void testRunOneCycle(@Injectable KafkaRoutineLoadJob kafkaRoutineLoadJob1 Deencapsulation.setField(kafkaProgress, "partitionIdToOffset", partitionIdToOffset); LinkedBlockingDeque routineLoadTaskInfoQueue = new LinkedBlockingDeque<>(); - KafkaTaskInfo routineLoadTaskInfo1 = new KafkaTaskInfo(new UUID(1, 1), 1L, 20000, + KafkaTaskInfo routineLoadTaskInfo1 = new KafkaTaskInfo(new UUID(1, 1), 1L, 20000, 0, partitionIdToOffset, false); routineLoadTaskInfoQueue.addFirst(routineLoadTaskInfo1); diff --git a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java index 0203d614e00ef9..d9e2088e5994a2 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java @@ -318,7 +318,7 @@ public void testCommitRoutineLoadTransaction(@Injectable TabletCommitInfo tablet List routineLoadTaskInfoList = Deencapsulation.getField(routineLoadJob, "routineLoadTaskInfoList"); Map partitionIdToOffset = Maps.newHashMap(); partitionIdToOffset.put(1, 0L); - KafkaTaskInfo routineLoadTaskInfo = new KafkaTaskInfo(UUID.randomUUID(), 1L, 20000, + KafkaTaskInfo routineLoadTaskInfo = new KafkaTaskInfo(UUID.randomUUID(), 1L, 20000, 0, partitionIdToOffset, false); Deencapsulation.setField(routineLoadTaskInfo, "txnId", 1L); routineLoadTaskInfoList.add(routineLoadTaskInfo); @@ -390,7 +390,7 @@ public void testCommitRoutineLoadTransactionWithErrorMax(@Injectable TabletCommi List routineLoadTaskInfoList = Deencapsulation.getField(routineLoadJob, "routineLoadTaskInfoList"); Map partitionIdToOffset = Maps.newHashMap(); partitionIdToOffset.put(1, 0L); - KafkaTaskInfo routineLoadTaskInfo = new KafkaTaskInfo(UUID.randomUUID(), 1L, 20000, + KafkaTaskInfo routineLoadTaskInfo = new KafkaTaskInfo(UUID.randomUUID(), 1L, 20000, 0, partitionIdToOffset, false); Deencapsulation.setField(routineLoadTaskInfo, "txnId", 1L); routineLoadTaskInfoList.add(routineLoadTaskInfo);