From 4ab2e55c1a3c3258b413ec752cf2f9c1999e00c1 Mon Sep 17 00:00:00 2001 From: morningman Date: Thu, 19 Sep 2019 20:12:13 +0800 Subject: [PATCH 1/2] Fix bug that routine load may mistakenly skipped some data Reproduce: 1. start a routine load, send a routine load task to BE 2. BE executes task successfully and commit to FE. 3. Commit request failed on FE because database is renamed(throw db not found exception) 4. After commit failed, BE will send rollback request to FE. 5. FE receive this rollback request and mistakenly update the routine load progress, because the number of loaded rows in this rollback request's attachment is larger than 0 --- .../load/routineload/KafkaRoutineLoadJob.java | 28 +++++++++++++------ .../load/routineload/RoutineLoadJob.java | 5 ++-- 2 files changed, 23 insertions(+), 10 deletions(-) diff --git a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java index b60a21c8459792..4b3c846050f47e 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java @@ -17,6 +17,8 @@ package org.apache.doris.load.routineload; +import static org.apache.doris.analysis.CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS; + import org.apache.doris.analysis.CreateRoutineLoadStmt; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; @@ -37,6 +39,7 @@ import org.apache.doris.common.util.SmallFileMgr; import org.apache.doris.common.util.SmallFileMgr.SmallFile; import org.apache.doris.system.SystemInfoService; +import org.apache.doris.transaction.TransactionStatus; import com.google.common.base.Joiner; import com.google.common.collect.Lists; @@ -56,8 +59,6 @@ import java.util.Map; import java.util.UUID; -import static org.apache.doris.analysis.CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS; - /** * KafkaRoutineLoadJob is a kind of RoutineLoadJob which fetch data from kafka. * The progress which is super class property is seems like "{"partition1": offset1, "partition2": offset2}" @@ -189,15 +190,26 @@ public int calculateCurrentConcurrentTaskNum() throws MetaNotFoundException { return currentTaskConcurrentNum; } - // partitionIdToOffset must be not empty when loaded rows > 0 - // situation1: be commit txn but fe throw error when committing txn, - // fe rollback txn without partitionIdToOffset by itself - // this task should not be commit - // otherwise currentErrorNum and currentTotalNum is updated when progress is not updated + // case1: BE execute the task successfully and commit it to FE, but failed on FE(such as db renamed, not found), + // after commit failed, BE try to rollback this txn, and loaded rows in its attachment is larger than 0. + // In this case, FE should not update the progress. + // + // case2: partitionIdToOffset must be not empty when loaded rows > 0 + // be commit txn but fe throw error when committing txn, + // fe rollback txn without partitionIdToOffset by itself + // this task should not be commit + // otherwise currentErrorNum and currentTotalNum is updated when progress is not updated @Override - protected boolean checkCommitInfo(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment) { + protected boolean checkCommitInfo(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment, + TransactionStatus txnStatus) { + if (rlTaskTxnCommitAttachment.getLoadedRows() > 0 && txnStatus == TransactionStatus.ABORTED) { + // case 1 + return false; + } + if (rlTaskTxnCommitAttachment.getLoadedRows() > 0 && (!((KafkaProgress) rlTaskTxnCommitAttachment.getProgress()).hasPartition())) { + // case 2 LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_TASK, DebugUtil.printId(rlTaskTxnCommitAttachment.getTaskId())) .add("job_id", id) .add("loaded_rows", rlTaskTxnCommitAttachment.getLoadedRows()) diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 29d4c6ed1fa4a6..77f5a552b6bab7 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -776,7 +776,7 @@ private void executeCommitTask(RoutineLoadTaskInfo routineLoadTaskInfo, Transact + " maybe task was aborted by master when timeout") .build()); } - } else if (checkCommitInfo(rlTaskTxnCommitAttachment)) { + } else if (checkCommitInfo(rlTaskTxnCommitAttachment, txnState.getTransactionStatus())) { // step2: update job progress updateProgress(rlTaskTxnCommitAttachment); } @@ -974,7 +974,8 @@ public void setOrigStmt(String origStmt) { } // check the correctness of commit info - protected abstract boolean checkCommitInfo(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment); + protected abstract boolean checkCommitInfo(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment, + TransactionStatus txnStatus); protected abstract String getStatistic(); From a8c67478413e95196ffc5bf03a80b2fdf73a16da Mon Sep 17 00:00:00 2001 From: morningman Date: Thu, 19 Sep 2019 20:17:45 +0800 Subject: [PATCH 2/2] remove import --- .../apache/doris/load/routineload/KafkaRoutineLoadJob.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java index 4b3c846050f47e..d89f93f887c5ae 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java @@ -17,8 +17,6 @@ package org.apache.doris.load.routineload; -import static org.apache.doris.analysis.CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS; - import org.apache.doris.analysis.CreateRoutineLoadStmt; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; @@ -133,8 +131,8 @@ private void convertCustomProperties() throws DdlException { convertedCustomProperties.put(entry.getKey(), entry.getValue()); } } - if (convertedCustomProperties.containsKey(KAFKA_DEFAULT_OFFSETS)) { - kafkaDefaultOffSet = convertedCustomProperties.remove(KAFKA_DEFAULT_OFFSETS); + if (convertedCustomProperties.containsKey(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS)) { + kafkaDefaultOffSet = convertedCustomProperties.remove(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS); } }