Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.doris.common.util.SmallFileMgr;
import org.apache.doris.common.util.SmallFileMgr.SmallFile;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.transaction.TransactionStatus;

import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
Expand All @@ -56,8 +57,6 @@
import java.util.Map;
import java.util.UUID;

import static org.apache.doris.analysis.CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS;

/**
* KafkaRoutineLoadJob is a kind of RoutineLoadJob which fetch data from kafka.
* The progress which is super class property is seems like "{"partition1": offset1, "partition2": offset2}"
Expand Down Expand Up @@ -132,8 +131,8 @@ private void convertCustomProperties() throws DdlException {
convertedCustomProperties.put(entry.getKey(), entry.getValue());
}
}
if (convertedCustomProperties.containsKey(KAFKA_DEFAULT_OFFSETS)) {
kafkaDefaultOffSet = convertedCustomProperties.remove(KAFKA_DEFAULT_OFFSETS);
if (convertedCustomProperties.containsKey(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS)) {
kafkaDefaultOffSet = convertedCustomProperties.remove(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS);
}
}

Expand Down Expand Up @@ -189,15 +188,26 @@ public int calculateCurrentConcurrentTaskNum() throws MetaNotFoundException {
return currentTaskConcurrentNum;
}

// partitionIdToOffset must be not empty when loaded rows > 0
// situation1: be commit txn but fe throw error when committing txn,
// fe rollback txn without partitionIdToOffset by itself
// this task should not be commit
// otherwise currentErrorNum and currentTotalNum is updated when progress is not updated
// case1: BE execute the task successfully and commit it to FE, but failed on FE(such as db renamed, not found),
// after commit failed, BE try to rollback this txn, and loaded rows in its attachment is larger than 0.
// In this case, FE should not update the progress.
//
// case2: partitionIdToOffset must be not empty when loaded rows > 0
// be commit txn but fe throw error when committing txn,
// fe rollback txn without partitionIdToOffset by itself
// this task should not be commit
// otherwise currentErrorNum and currentTotalNum is updated when progress is not updated
@Override
protected boolean checkCommitInfo(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment) {
protected boolean checkCommitInfo(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment,
TransactionStatus txnStatus) {
if (rlTaskTxnCommitAttachment.getLoadedRows() > 0 && txnStatus == TransactionStatus.ABORTED) {
// case 1
return false;
}

if (rlTaskTxnCommitAttachment.getLoadedRows() > 0
&& (!((KafkaProgress) rlTaskTxnCommitAttachment.getProgress()).hasPartition())) {
// case 2
LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_TASK, DebugUtil.printId(rlTaskTxnCommitAttachment.getTaskId()))
.add("job_id", id)
.add("loaded_rows", rlTaskTxnCommitAttachment.getLoadedRows())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -776,7 +776,7 @@ private void executeCommitTask(RoutineLoadTaskInfo routineLoadTaskInfo, Transact
+ " maybe task was aborted by master when timeout")
.build());
}
} else if (checkCommitInfo(rlTaskTxnCommitAttachment)) {
} else if (checkCommitInfo(rlTaskTxnCommitAttachment, txnState.getTransactionStatus())) {
// step2: update job progress
updateProgress(rlTaskTxnCommitAttachment);
}
Expand Down Expand Up @@ -974,7 +974,8 @@ public void setOrigStmt(String origStmt) {
}

// check the correctness of commit info
protected abstract boolean checkCommitInfo(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment);
protected abstract boolean checkCommitInfo(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment,
TransactionStatus txnStatus);

protected abstract String getStatistic();

Expand Down