diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java index de1cf5096d2a7e..dbddc695a68904 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java @@ -58,6 +58,7 @@ public KafkaTaskInfo(KafkaTaskInfo kafkaTaskInfo, Map partitionId kafkaTaskInfo.getTimeoutMs(), kafkaTaskInfo.getTimeoutBackOffCount(), kafkaTaskInfo.getBeId(), isMultiTable); this.partitionIdToOffset = partitionIdToOffset; + this.isEof = kafkaTaskInfo.getIsEof(); } public List getPartitions() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 57aa84e773137d..3a1c35bd10f663 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -1247,7 +1247,7 @@ private void executeTaskOnTxnStatusChanged(RoutineLoadTaskInfo routineLoadTaskIn } else if (checkCommitInfo(rlTaskTxnCommitAttachment, txnState, txnStatusChangeReason)) { // step2: update job progress updateProgress(rlTaskTxnCommitAttachment); - routineLoadTaskInfo.selfAdaptTimeout(rlTaskTxnCommitAttachment); + routineLoadTaskInfo.handleTaskByTxnCommitAttachment(rlTaskTxnCommitAttachment); } if (rlTaskTxnCommitAttachment != null && !Strings.isNullOrEmpty(rlTaskTxnCommitAttachment.getErrorLogUrl())) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java index ae2570224c40ac..69c07507487983 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java @@ -77,6 +77,8 @@ public abstract class RoutineLoadTaskInfo { protected static final int MAX_TIMEOUT_BACK_OFF_COUNT = 3; protected int timeoutBackOffCount = 0; + protected boolean isEof = false; + // this status will be set when corresponding transaction's status is changed. // so that user or other logic can know the status of the corresponding txn. protected TransactionStatus txnStatus = TransactionStatus.UNKNOWN; @@ -167,6 +169,10 @@ public int getTimeoutBackOffCount() { return timeoutBackOffCount; } + public boolean getIsEof() { + return isEof; + } + public boolean isTimeout() { if (txnStatus == TransactionStatus.COMMITTED || txnStatus == TransactionStatus.VISIBLE) { // the corresponding txn is already finished, this task can not be treated as timeout. @@ -181,7 +187,12 @@ public boolean isTimeout() { return false; } - public void selfAdaptTimeout(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment) { + public void handleTaskByTxnCommitAttachment(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment) { + selfAdaptTimeout(rlTaskTxnCommitAttachment); + judgeEof(rlTaskTxnCommitAttachment); + } + + private void selfAdaptTimeout(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment) { long taskExecutionTime = rlTaskTxnCommitAttachment.getTaskExecutionTimeMs(); long timeoutMs = this.timeoutMs; @@ -196,6 +207,15 @@ public void selfAdaptTimeout(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment this.timeoutMs = timeoutMs; } + private void judgeEof(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment) { + RoutineLoadJob routineLoadJob = routineLoadManager.getJob(jobId); + if (rlTaskTxnCommitAttachment.getTotalRows() < routineLoadJob.getMaxBatchRows() + && rlTaskTxnCommitAttachment.getReceivedBytes() < routineLoadJob.getMaxBatchSizeBytes() + && rlTaskTxnCommitAttachment.getTaskExecutionTimeMs() < this.timeoutMs) { + this.isEof = true; + } + } + abstract TRoutineLoadTask createRoutineLoadTask() throws UserException; // begin the txn of this task diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java index d42b600fb1d869..f76cf6205fe303 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java @@ -101,11 +101,15 @@ private void process() throws UserException, InterruptedException { try { // This step will be blocked when queue is empty RoutineLoadTaskInfo routineLoadTaskInfo = needScheduleTasksQueue.take(); - if (System.currentTimeMillis() - routineLoadTaskInfo.getLastScheduledTime() - < routineLoadTaskInfo.getTimeoutMs()) { - // try to delay scheduling this task for 'timeout', to void too many failure - needScheduleTasksQueue.addLast(routineLoadTaskInfo); - return; + // try to delay scheduling tasks that are perceived as Eof to MaxBatchInterval + // to avoid to much small transaction + if (routineLoadTaskInfo.getIsEof()) { + RoutineLoadJob routineLoadJob = routineLoadManager.getJob(routineLoadTaskInfo.getJobId()); + if (System.currentTimeMillis() - routineLoadTaskInfo.getLastScheduledTime() + < routineLoadJob.getMaxBatchIntervalS()) { + needScheduleTasksQueue.addLast(routineLoadTaskInfo); + return; + } } scheduleOneTask(routineLoadTaskInfo); } catch (Exception e) { @@ -114,6 +118,7 @@ private void process() throws UserException, InterruptedException { } private void scheduleOneTask(RoutineLoadTaskInfo routineLoadTaskInfo) throws Exception { + routineLoadTaskInfo.setLastScheduledTime(System.currentTimeMillis()); if (LOG.isDebugEnabled()) { LOG.debug("schedule routine load task info {} for job {}", routineLoadTaskInfo.id, routineLoadTaskInfo.getJobId());