diff --git a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java index b60a21c8459792..d89f93f887c5ae 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java @@ -37,6 +37,7 @@ import org.apache.doris.common.util.SmallFileMgr; import org.apache.doris.common.util.SmallFileMgr.SmallFile; import org.apache.doris.system.SystemInfoService; +import org.apache.doris.transaction.TransactionStatus; import com.google.common.base.Joiner; import com.google.common.collect.Lists; @@ -56,8 +57,6 @@ import java.util.Map; import java.util.UUID; -import static org.apache.doris.analysis.CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS; - /** * KafkaRoutineLoadJob is a kind of RoutineLoadJob which fetch data from kafka. * The progress which is super class property is seems like "{"partition1": offset1, "partition2": offset2}" @@ -132,8 +131,8 @@ private void convertCustomProperties() throws DdlException { convertedCustomProperties.put(entry.getKey(), entry.getValue()); } } - if (convertedCustomProperties.containsKey(KAFKA_DEFAULT_OFFSETS)) { - kafkaDefaultOffSet = convertedCustomProperties.remove(KAFKA_DEFAULT_OFFSETS); + if (convertedCustomProperties.containsKey(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS)) { + kafkaDefaultOffSet = convertedCustomProperties.remove(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS); } } @@ -189,15 +188,26 @@ public int calculateCurrentConcurrentTaskNum() throws MetaNotFoundException { return currentTaskConcurrentNum; } - // partitionIdToOffset must be not empty when loaded rows > 0 - // situation1: be commit txn but fe throw error when committing txn, - // fe rollback txn without partitionIdToOffset by itself - // this task should not be commit - // otherwise currentErrorNum and currentTotalNum is updated when progress is not updated + // case1: BE execute the task successfully and commit it to FE, but failed on FE(such as db renamed, not found), + // after commit failed, BE try to rollback this txn, and loaded rows in its attachment is larger than 0. + // In this case, FE should not update the progress. + // + // case2: partitionIdToOffset must be not empty when loaded rows > 0 + // be commit txn but fe throw error when committing txn, + // fe rollback txn without partitionIdToOffset by itself + // this task should not be commit + // otherwise currentErrorNum and currentTotalNum is updated when progress is not updated @Override - protected boolean checkCommitInfo(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment) { + protected boolean checkCommitInfo(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment, + TransactionStatus txnStatus) { + if (rlTaskTxnCommitAttachment.getLoadedRows() > 0 && txnStatus == TransactionStatus.ABORTED) { + // case 1 + return false; + } + if (rlTaskTxnCommitAttachment.getLoadedRows() > 0 && (!((KafkaProgress) rlTaskTxnCommitAttachment.getProgress()).hasPartition())) { + // case 2 LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_TASK, DebugUtil.printId(rlTaskTxnCommitAttachment.getTaskId())) .add("job_id", id) .add("loaded_rows", rlTaskTxnCommitAttachment.getLoadedRows()) diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 29d4c6ed1fa4a6..77f5a552b6bab7 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -776,7 +776,7 @@ private void executeCommitTask(RoutineLoadTaskInfo routineLoadTaskInfo, Transact + " maybe task was aborted by master when timeout") .build()); } - } else if (checkCommitInfo(rlTaskTxnCommitAttachment)) { + } else if (checkCommitInfo(rlTaskTxnCommitAttachment, txnState.getTransactionStatus())) { // step2: update job progress updateProgress(rlTaskTxnCommitAttachment); } @@ -974,7 +974,8 @@ public void setOrigStmt(String origStmt) { } // check the correctness of commit info - protected abstract boolean checkCommitInfo(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment); + protected abstract boolean checkCommitInfo(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment, + TransactionStatus txnStatus); protected abstract String getStatistic();