From ded832b53ba2c31b62595cc92a65f9b9bb2696b0 Mon Sep 17 00:00:00 2001 From: morningman Date: Sat, 16 Mar 2019 17:26:48 +0800 Subject: [PATCH] modify the replay logic of routine load job --- .../doris/analysis/CreateRoutineLoadStmt.java | 8 +- .../doris/load/routineload/KafkaProgress.java | 30 +-- .../load/routineload/KafkaRoutineLoadJob.java | 28 ++- .../RLTaskTxnCommitAttachment.java | 15 +- .../load/routineload/RoutineLoadJob.java | 219 ++++++++++-------- .../load/routineload/RoutineLoadManager.java | 31 ++- .../load/routineload/RoutineLoadProgress.java | 44 ++++ .../transaction/GlobalTransactionMgr.java | 18 +- .../doris/transaction/TransactionState.java | 111 +++++---- .../transaction/TxnCommitAttachment.java | 42 ++++ .../transaction/TxnStateChangeListener.java | 28 +-- .../transaction/GlobalTransactionMgrTest.java | 4 +- 12 files changed, 355 insertions(+), 223 deletions(-) diff --git a/fe/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java b/fe/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java index 3e920ebfd503a7..d8b5bc9ebc1334 100644 --- a/fe/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java @@ -198,10 +198,10 @@ public void analyze(Analyzer analyzer) throws UserException { dbTableName.analyze(analyzer); // check load properties include column separator etc. checkLoadProperties(analyzer); - // check routine load properties include desired concurrent number etc. + // check routine load job properties include desired concurrent number etc. checkJobProperties(); - // check data load source properties - checkLoadSourceProperties(); + // check data source properties + checkDataSourceProperties(); } public void checkLoadProperties(Analyzer analyzer) throws UserException { @@ -274,7 +274,7 @@ private int getIntegetPropertyOrDefault(String propName, String hintMsg, int def return defaultVal; } - private void checkLoadSourceProperties() throws AnalysisException { + private void checkDataSourceProperties() throws AnalysisException { LoadDataSourceType type; try { type = LoadDataSourceType.valueOf(typeName); diff --git a/fe/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java b/fe/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java index 573cb42575ddb3..3eb722ca04ee83 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java @@ -40,9 +40,11 @@ public class KafkaProgress extends RoutineLoadProgress { private Map partitionIdToOffset = Maps.newHashMap(); public KafkaProgress() { + super(LoadDataSourceType.KAFKA); } public KafkaProgress(TKafkaRLTaskProgress tKafkaRLTaskProgress) { + super(LoadDataSourceType.KAFKA); this.partitionIdToOffset = tKafkaRLTaskProgress.getPartitionCmtOffset(); } @@ -58,6 +60,18 @@ public void setPartitionIdToOffset(Map partitionIdToOffset) { this.partitionIdToOffset = partitionIdToOffset; } + // (partition id, end offset) + // end offset = -1 while begin offset of partition is 0 + @Override + public String toString() { + Map showPartitionIdToOffset = new HashMap<>(); + for (Map.Entry entry : partitionIdToOffset.entrySet()) { + showPartitionIdToOffset.put(entry.getKey(), entry.getValue() - 1); + } + return "KafkaProgress [partitionIdToOffset=" + + Joiner.on("|").withKeyValueSeparator("_").join(showPartitionIdToOffset) + "]"; + } + @Override public void update(RoutineLoadProgress progress) { KafkaProgress newProgress = (KafkaProgress) progress; @@ -67,8 +81,9 @@ public void update(RoutineLoadProgress progress) { @Override public void write(DataOutput out) throws IOException { + super.write(out); out.writeInt(partitionIdToOffset.size()); - for (Map.Entry entry : partitionIdToOffset.entrySet()) { + for (Map.Entry entry : partitionIdToOffset.entrySet()) { out.writeInt((Integer) entry.getKey()); out.writeLong((Long) entry.getValue()); } @@ -76,22 +91,11 @@ public void write(DataOutput out) throws IOException { @Override public void readFields(DataInput in) throws IOException { + super.readFields(in); int size = in.readInt(); partitionIdToOffset = new HashMap<>(); for (int i = 0; i < size; i++) { partitionIdToOffset.put(in.readInt(), in.readLong()); } } - - // (partition id, end offset) - // end offset = -1 while begin offset of partition is 0 - @Override - public String toString() { - Map showPartitionIdToOffset = new HashMap<>(); - for (Map.Entry entry : partitionIdToOffset.entrySet()) { - showPartitionIdToOffset.put(entry.getKey(), entry.getValue() - 1); - } - return "KafkaProgress [partitionIdToOffset=" - + Joiner.on("|").withKeyValueSeparator("_").join(showPartitionIdToOffset) + "]"; - } } diff --git a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java index 1d798a4eb3e960..888ac78991353d 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java @@ -61,7 +61,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { private static final Logger LOG = LogManager.getLogger(KafkaRoutineLoadJob.class); - private static final int FETCH_PARTITIONS_TIMEOUT_SECOND = 10; + private static final int FETCH_PARTITIONS_TIMEOUT_SECOND = 5; private String brokerList; private String topic; @@ -177,8 +177,14 @@ boolean checkCommitInfo(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment) { } @Override - protected void updateProgress(RLTaskTxnCommitAttachment attachment, boolean isReplay) { - super.updateProgress(attachment, isReplay); + protected void updateProgress(RLTaskTxnCommitAttachment attachment) { + super.updateProgress(attachment); + this.progress.update(attachment.getProgress()); + } + + @Override + protected void replayUpdateProgress(RLTaskTxnCommitAttachment attachment) { + super.replayUpdateProgress(attachment); this.progress.update(attachment.getProgress()); } @@ -221,7 +227,8 @@ protected boolean unprotectNeedReschedule() { .build(), e); if (this.state == JobState.NEED_SCHEDULE) { unprotectUpdateState(JobState.PAUSED, - "Job failed to fetch all current partition with error " + e.getMessage(), false); + "Job failed to fetch all current partition with error " + e.getMessage(), + false /* not replay */); } return false; } @@ -257,8 +264,8 @@ protected boolean unprotectNeedReschedule() { private List getAllKafkaPartitions() { List result = new ArrayList<>(); - List partitionList = consumer.partitionsFor( - topic, Duration.ofSeconds(FETCH_PARTITIONS_TIMEOUT_SECOND)); + List partitionList = consumer.partitionsFor(topic, + Duration.ofSeconds(FETCH_PARTITIONS_TIMEOUT_SECOND)); for (PartitionInfo partitionInfo : partitionList) { result.add(partitionInfo.partition()); } @@ -271,8 +278,9 @@ public static KafkaRoutineLoadJob fromCreateStmt(CreateRoutineLoadStmt stmt) thr if (db == null) { ErrorReport.reportDdlException(ErrorCode.ERR_BAD_DB_ERROR, stmt.getDBTableName().getDb()); } - db.readLock(); + long tableId = -1L; + db.readLock(); try { unprotectedCheckMeta(db, stmt.getDBTableName().getTbl(), stmt.getRoutineLoadDesc()); tableId = db.getTable(stmt.getDBTableName().getTbl()).getId(); @@ -282,10 +290,8 @@ public static KafkaRoutineLoadJob fromCreateStmt(CreateRoutineLoadStmt stmt) thr // init kafka routine load job long id = Catalog.getInstance().getNextId(); - KafkaRoutineLoadJob kafkaRoutineLoadJob = - new KafkaRoutineLoadJob(id, stmt.getName(), db.getId(), tableId, - stmt.getKafkaBrokerList(), - stmt.getKafkaTopic()); + KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(id, stmt.getName(), db.getId(), + tableId, stmt.getKafkaBrokerList(), stmt.getKafkaTopic()); kafkaRoutineLoadJob.setOptional(stmt); return kafkaRoutineLoadJob; diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RLTaskTxnCommitAttachment.java b/fe/src/main/java/org/apache/doris/load/routineload/RLTaskTxnCommitAttachment.java index 2094fc94389c82..5006bd42ccba66 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RLTaskTxnCommitAttachment.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RLTaskTxnCommitAttachment.java @@ -19,6 +19,7 @@ import org.apache.doris.thrift.TRLTaskTxnCommitAttachment; import org.apache.doris.thrift.TUniqueId; +import org.apache.doris.transaction.TransactionState; import org.apache.doris.transaction.TxnCommitAttachment; import java.io.DataInput; @@ -34,12 +35,13 @@ public class RLTaskTxnCommitAttachment extends TxnCommitAttachment { private long filteredRows; private long loadedRows; private RoutineLoadProgress progress; - private LoadDataSourceType loadDataSourceType; public RLTaskTxnCommitAttachment() { + super(TransactionState.LoadJobSourceType.ROUTINE_LOAD_TASK); } public RLTaskTxnCommitAttachment(TRLTaskTxnCommitAttachment rlTaskTxnCommitAttachment) { + super(TransactionState.LoadJobSourceType.ROUTINE_LOAD_TASK); this.jobId = rlTaskTxnCommitAttachment.getJobId(); this.taskId = rlTaskTxnCommitAttachment.getId(); this.filteredRows = rlTaskTxnCommitAttachment.getFilteredRows(); @@ -47,7 +49,6 @@ public RLTaskTxnCommitAttachment(TRLTaskTxnCommitAttachment rlTaskTxnCommitAttac switch (rlTaskTxnCommitAttachment.getLoadSourceType()) { case KAFKA: - this.loadDataSourceType = LoadDataSourceType.KAFKA; this.progress = new KafkaProgress(rlTaskTxnCommitAttachment.getKafkaRLTaskProgress()); default: break; @@ -81,11 +82,17 @@ public String toString() { @Override public void write(DataOutput out) throws IOException { - // TODO: think twice + super.write(out); + out.writeLong(filteredRows); + out.writeLong(loadedRows); + progress.write(out); } @Override public void readFields(DataInput in) throws IOException { - // TODO: think twice + super.readFields(in); + filteredRows = in.readLong(); + loadedRows = in.readLong(); + progress = RoutineLoadProgress.read(in); } } diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 0d619ff75801d1..4981ae6ad26cc9 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -41,11 +41,8 @@ import org.apache.doris.persist.RoutineLoadOperation; import org.apache.doris.planner.StreamLoadPlanner; import org.apache.doris.qe.ConnectContext; -import org.apache.doris.service.ExecuteEnv; -import org.apache.doris.service.FrontendServiceImpl; import org.apache.doris.task.StreamLoadTask; import org.apache.doris.thrift.TExecPlanFragmentParams; -import org.apache.doris.thrift.TLoadTxnCommitRequest; import org.apache.doris.transaction.AbortTransactionException; import org.apache.doris.transaction.BeginTransactionException; import org.apache.doris.transaction.TransactionException; @@ -58,7 +55,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.thrift.TException; import java.io.DataInput; import java.io.DataOutput; @@ -78,13 +74,13 @@ * The desireTaskConcurrentNum means that user expect the number of concurrent stream load * The routine load job support different streaming medium such as KAFKA */ -public abstract class RoutineLoadJob extends TxnStateChangeListener implements Writable { +public abstract class RoutineLoadJob implements TxnStateChangeListener, Writable { private static final Logger LOG = LogManager.getLogger(RoutineLoadJob.class); private static final int DEFAULT_TASK_TIMEOUT_SECONDS = 10; - private static final int BASE_OF_ERROR_RATE = 10000; - private static final int DEFAULT_MAX_ERROR_NUM = (int) (BASE_OF_ERROR_RATE * 0.5); + private static final int ERROR_SAMPLE_NUM = 1000 * 10000; + private static final int DEFAULT_MAX_ERROR_NUM = 0; private static final int DEFAULT_MAX_INTERVAL_SECOND = 5; private static final int DEFAULT_MAX_BATCH_ROWS = 100000; private static final int DEFAULT_MAX_BATCH_SIZE = 100 * 1024 * 1024; // 100MB @@ -125,6 +121,7 @@ public boolean isFinalState() { } } + protected long id; protected String name; protected long dbId; protected long tableId; @@ -147,10 +144,14 @@ public boolean isFinalState() { protected String cancelReason; protected long endTimestamp; - // currentErrorNum and currentTotalNum will be update - // when currentTotalNum is more then ten thousand or currentErrorNum is more then maxErrorNum - protected long currentErrorNum; - protected long currentTotalNum; + /* + * currentErrorRows and currentTotalRows is used for check error rate + * errorRows and totalRows are used for statistics + */ + protected long currentErrorRows; + protected long currentTotalRows; + protected long errorRows; + protected long totalRows; // The tasks belong to this job protected List routineLoadTaskInfoList = Lists.newArrayList(); @@ -172,12 +173,12 @@ public void setTypeRead(boolean isTypeRead) { } public RoutineLoadJob(long id, LoadDataSourceType type) { - super(id); + this.id = id; this.dataSourceType = type; } public RoutineLoadJob(Long id, String name, long dbId, long tableId, LoadDataSourceType dataSourceType) { - super(id); + this.id = id; this.name = name; this.dbId = dbId; this.tableId = tableId; @@ -194,7 +195,7 @@ public RoutineLoadJob(long id, String name, long dbId, long tableId, RoutineLoadDesc routineLoadDesc, int desireTaskConcurrentNum, LoadDataSourceType dataSourceType, int maxErrorNum) { - super(id); + this.id = id; this.name = name; this.dbId = dbId; this.tableId = tableId; @@ -224,6 +225,11 @@ protected void setOptional(CreateRoutineLoadStmt stmt) throws UserException { } } + @Override + public long getId() { + return id; + } + public void readLock() { lock.readLock().lock(); } @@ -425,12 +431,6 @@ public Map getBeIdToConcurrentTaskNum() { } } - // if rate of error data is more then max_filter_ratio, pause job - protected void updateProgress(RLTaskTxnCommitAttachment attachment, boolean isReplay) { - updateNumOfData(attachment.getFilteredRows(), attachment.getLoadedRows() + attachment.getFilteredRows(), - isReplay); - } - public boolean containsTask(UUID taskId) { readLock(); try { @@ -458,19 +458,24 @@ private void checkStateTransform(RoutineLoadJob.JobState desireState) } } - private void loadTxnCommit(TLoadTxnCommitRequest request) throws TException { - FrontendServiceImpl frontendService = new FrontendServiceImpl(ExecuteEnv.getInstance()); - frontendService.loadTxnCommit(request); + // if rate of error data is more then max_filter_ratio, pause job + protected void updateProgress(RLTaskTxnCommitAttachment attachment) { + updateNumOfData(attachment.getFilteredRows(), attachment.getLoadedRows() + attachment.getFilteredRows(), + false /* not replay */); } - private void updateNumOfData(long numOfErrorData, long numOfTotalData, boolean isReplay) { - currentErrorNum += numOfErrorData; - currentTotalNum += numOfTotalData; - if (currentTotalNum > BASE_OF_ERROR_RATE) { - if (currentErrorNum > maxErrorNum) { + private void updateNumOfData(long numOfErrorRows, long numOfTotalRows, boolean isReplay) { + totalRows += numOfTotalRows; + errorRows += numOfErrorRows; + + // check error rate + currentErrorRows += numOfErrorRows; + currentTotalRows += numOfTotalRows; + if (currentTotalRows > ERROR_SAMPLE_NUM) { + if (currentErrorRows > maxErrorNum) { LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) - .add("current_total_num", currentTotalNum) - .add("current_error_num", currentErrorNum) + .add("current_total_num", currentTotalRows) + .add("current_error_num", currentErrorRows) .add("max_error_num", maxErrorNum) .add("msg", "current error num is more then max error num, begin to pause job") .build()); @@ -479,35 +484,40 @@ private void updateNumOfData(long numOfErrorData, long numOfTotalData, boolean i } LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) - .add("current_total_num", currentTotalNum) - .add("current_error_num", currentErrorNum) + .add("current_total_num", currentTotalRows) + .add("current_error_num", currentErrorRows) .add("max_error_num", maxErrorNum) .add("msg", "reset current total num and current error num when current total num is more then base") .build()); // reset currentTotalNum and currentErrorNum - currentErrorNum = 0; - currentTotalNum = 0; - } else if (currentErrorNum > maxErrorNum) { + currentErrorRows = 0; + currentTotalRows = 0; + } else if (currentErrorRows > maxErrorNum) { LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) - .add("current_total_num", currentTotalNum) - .add("current_error_num", currentErrorNum) + .add("current_total_num", currentTotalRows) + .add("current_error_num", currentErrorRows) .add("max_error_num", maxErrorNum) .add("msg", "current error num is more then max error num, begin to pause job") .build()); // remove all of task in jobs and change job state to paused updateState(JobState.PAUSED, "current error num is more then max error num", isReplay); // reset currentTotalNum and currentErrorNum - currentErrorNum = 0; - currentTotalNum = 0; + currentErrorRows = 0; + currentTotalRows = 0; LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) - .add("current_total_num", currentTotalNum) - .add("current_error_num", currentErrorNum) + .add("current_total_num", currentTotalRows) + .add("current_error_num", currentErrorRows) .add("max_error_num", maxErrorNum) .add("msg", "reset current total num and current error num when current total num is more then max error num") .build()); } } + protected void replayUpdateProgress(RLTaskTxnCommitAttachment attachment) { + updateNumOfData(attachment.getFilteredRows(), attachment.getLoadedRows() + attachment.getFilteredRows(), + true /* is replay */); + } + abstract RoutineLoadTaskInfo unprotectRenewTask(RoutineLoadTaskInfo routineLoadTaskInfo) throws AnalysisException, LabelAlreadyUsedException, BeginTransactionException; @@ -569,27 +579,23 @@ public void beforeCommitted(TransactionState txnState) throws TransactionExcepti // the task is committed when the correct number of rows is more then 0 @Override - public void onCommitted(TransactionState txnState, boolean isReplay) throws TransactionException { + public ListenResult onCommitted(TransactionState txnState) throws TransactionException { + ListenResult result = ListenResult.UNCHANGED; writeLock(); try { - if (isReplay) { - // only update progress - if (txnState.getTxnCommitAttachment() != null) { - updateProgress((RLTaskTxnCommitAttachment) txnState.getTxnCommitAttachment(), isReplay); - } + // find task in job + Optional routineLoadTaskInfoOptional = routineLoadTaskInfoList.parallelStream().filter( + entity -> entity.getTxnId() == txnState.getTransactionId()).findFirst(); + if (routineLoadTaskInfoOptional.isPresent()) { + executeCommitTask(routineLoadTaskInfoOptional.get(), txnState); + result = ListenResult.CHANGED; } else { - // find task in job - Optional routineLoadTaskInfoOptional = routineLoadTaskInfoList.parallelStream().filter( - entity -> entity.getTxnId() == txnState.getTransactionId()).findFirst(); - if (routineLoadTaskInfoOptional.isPresent()) { - executeCommitTask(routineLoadTaskInfoOptional.get(), txnState); - } else { - LOG.debug(new LogBuilder(LogKey.ROUINTE_LOAD_TASK, txnState.getLabel()).add("txn_id", - txnState.getTransactionId()).add("msg", "The task is not in task info list. " - + "Maybe task has been renew or job state has changed. Transaction will not be committed.").build()); - throw new TransactionException("txn " + txnState.getTransactionId() + " could not be committed" - + " while task " + txnState.getLabel() + "has been aborted "); - } + LOG.debug(new LogBuilder(LogKey.ROUINTE_LOAD_TASK, txnState.getLabel()).add("txn_id", + txnState.getTransactionId()).add("msg", + "The task is not in task info list. " + + "Maybe task has been renew or job state has changed. Transaction will not be committed.").build()); + throw new TransactionException("txn " + txnState.getTransactionId() + " could not be committed" + + " while task " + txnState.getLabel() + "has been aborted "); } } catch (TransactionException e) { LOG.warn(e.getMessage(), e); @@ -597,10 +603,16 @@ public void onCommitted(TransactionState txnState, boolean isReplay) throws Tran } catch (Throwable e) { LOG.warn(e.getMessage(), e); updateState(JobState.PAUSED, "failed to update offset when transaction " - + txnState.getTransactionId() + " has been committed", isReplay); + + txnState.getTransactionId() + " has been committed", false /* not replay */); } finally { writeUnlock(); } + return result; + } + + @Override + public void replayOnCommitted(TransactionState txnState) { + replayUpdateProgress((RLTaskTxnCommitAttachment) txnState.getTxnCommitAttachment()); } // the task is aborted when the correct number of rows is more then 0 @@ -608,57 +620,61 @@ public void onCommitted(TransactionState txnState, boolean isReplay) throws Tran // txn will be aborted but progress will be update // progress will be update otherwise the progress will be hung @Override - public void onAborted(TransactionState txnState, String txnStatusChangeReasonString, boolean isReplay) { + public ListenResult onAborted(TransactionState txnState, String txnStatusChangeReasonString) { + ListenResult result = ListenResult.UNCHANGED; writeLock(); try { - if (isReplay) { - // only update progress - if (txnState.getTxnCommitAttachment() != null) { - updateProgress((RLTaskTxnCommitAttachment) txnState.getTxnCommitAttachment(), isReplay); - } - } else { - // step0: find task in job - Optional routineLoadTaskInfoOptional = routineLoadTaskInfoList.parallelStream().filter( - entity -> entity.getTxnId() == txnState.getTransactionId()).findFirst(); - if (routineLoadTaskInfoOptional.isPresent()) { - // step1: job state will be changed depending on txnStatusChangeReasonString - if (txnStatusChangeReasonString != null) { - LOG.debug(new LogBuilder(LogKey.ROUINTE_LOAD_TASK, txnState.getLabel()).add("txn_id", - txnState.getTransactionId()).add("msg", - "txn abort with reason " + txnStatusChangeReasonString).build()); - TransactionState.TxnStatusChangeReason txnStatusChangeReason = TransactionState.TxnStatusChangeReason.fromString( - txnStatusChangeReasonString); - if (txnStatusChangeReason != null) { - switch (txnStatusChangeReason) { - case OFFSET_OUT_OF_RANGE: - updateState(JobState.CANCELLED, txnStatusChangeReason.toString(), isReplay); - return; - default: - break; - } + // step0: find task in job + Optional routineLoadTaskInfoOptional = routineLoadTaskInfoList.parallelStream().filter( + entity -> entity.getTxnId() == txnState.getTransactionId()).findFirst(); + if (routineLoadTaskInfoOptional.isPresent()) { + // step1: job state will be changed depending on txnStatusChangeReasonString + if (txnStatusChangeReasonString != null) { + LOG.debug(new LogBuilder(LogKey.ROUINTE_LOAD_TASK, txnState.getLabel()).add("txn_id", + txnState.getTransactionId()).add("msg", + "txn abort with reason " + txnStatusChangeReasonString).build()); + TransactionState.TxnStatusChangeReason txnStatusChangeReason = TransactionState.TxnStatusChangeReason.fromString( + txnStatusChangeReasonString); + if (txnStatusChangeReason != null) { + switch (txnStatusChangeReason) { + case OFFSET_OUT_OF_RANGE: + updateState(JobState.CANCELLED, txnStatusChangeReason.toString(), + false /* not replay */); + return result; + default: + break; } - // todo(ml): use previous be id depend on change reason - } else { - LOG.debug(new LogBuilder(LogKey.ROUINTE_LOAD_TASK, txnState.getLabel()).add("txn_id", - txnState.getTransactionId()).add("msg", "txn abort").build()); } - // step2: commit task , update progress, maybe create a new task - executeCommitTask(routineLoadTaskInfoOptional.get(), txnState); + // todo(ml): use previous be id depend on change reason + } else { + LOG.debug(new LogBuilder(LogKey.ROUINTE_LOAD_TASK, txnState.getLabel()).add("txn_id", + txnState.getTransactionId()).add("msg", "txn abort").build()); } + // step2: commit task , update progress, maybe create a new task + executeCommitTask(routineLoadTaskInfoOptional.get(), txnState); + result = ListenResult.CHANGED; } } catch (Exception e) { updateState(JobState.PAUSED, - "failed to renew task when txn has been aborted with error " + e.getMessage(), isReplay); + "failed to renew task when txn has been aborted with error " + e.getMessage(), + false /* not replay */); // TODO(ml): edit log LOG.warn("failed to renew a routine load task in job {} with error message {}", id, e.getMessage(), e); } finally { writeUnlock(); } + return result; + } + + @Override + public void replayOnAborted(TransactionState txnState) { + replayUpdateProgress((RLTaskTxnCommitAttachment) txnState.getTxnCommitAttachment()); } // check task exists or not before call method - private void executeCommitTask(RoutineLoadTaskInfo routineLoadTaskInfo, TransactionState txnState) + private ListenResult executeCommitTask(RoutineLoadTaskInfo routineLoadTaskInfo, TransactionState txnState) throws LabelAlreadyUsedException, BeginTransactionException, AnalysisException { + ListenResult result = ListenResult.UNCHANGED; // step0: get progress from transaction state RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = (RLTaskTxnCommitAttachment) txnState.getTxnCommitAttachment(); if (rlTaskTxnCommitAttachment == null) { @@ -670,7 +686,8 @@ private void executeCommitTask(RoutineLoadTaskInfo routineLoadTaskInfo, Transact .build()); } else if (checkCommitInfo(rlTaskTxnCommitAttachment)) { // step2: update job progress - updateProgress(rlTaskTxnCommitAttachment, false /* not replay */); + updateProgress(rlTaskTxnCommitAttachment); + result = ListenResult.CHANGED; } if (state == JobState.RUNNING) { @@ -678,6 +695,8 @@ private void executeCommitTask(RoutineLoadTaskInfo routineLoadTaskInfo, Transact RoutineLoadTaskInfo newRoutineLoadTaskInfo = unprotectRenewTask(routineLoadTaskInfo); Catalog.getCurrentCatalog().getRoutineLoadTaskScheduler().addTaskInQueue(newRoutineLoadTaskInfo); } + + return result; } protected static void unprotectedCheckMeta(Database db, String tblName, RoutineLoadDesc routineLoadDesc) @@ -780,12 +799,14 @@ protected void unprotectUpdateState(JobState jobState, String reason, boolean is default: break; } + if (!isReplay) { Catalog.getInstance().getEditLog().logOpRoutineLoadJob(new RoutineLoadOperation(id, jobState)); } LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) .add("current_job_state", getState()) .add("msg", "job state has been changed") + .add("is replay", String.valueOf(isReplay)) .build()); } @@ -899,8 +920,8 @@ public void write(DataOutput out) throws IOException { out.writeInt(maxBatchRows); out.writeInt(maxBatchSizeBytes); progress.write(out); - out.writeLong(currentErrorNum); - out.writeLong(currentTotalNum); + out.writeLong(currentErrorRows); + out.writeLong(currentTotalRows); Text.writeString(out, origStmt); } @@ -931,8 +952,8 @@ public void readFields(DataInput in) throws IOException { throw new IOException("unknown data source type: " + dataSourceType); } - currentErrorNum = in.readLong(); - currentTotalNum = in.readLong(); + currentErrorRows = in.readLong(); + currentTotalRows = in.readLong(); origStmt = Text.readString(in); // parse the origin stmt to get routine load desc diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java index 6e03fa49d7c7d2..50cb97f4968f4a 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java @@ -153,6 +153,7 @@ public void createRoutineLoadJob(CreateRoutineLoadStmt createRoutineLoadStmt, St ConnectContext.get().getRemoteIP(), createRoutineLoadStmt.getDBTableName()); } + RoutineLoadJob routineLoadJob = null; LoadDataSourceType type = LoadDataSourceType.valueOf(createRoutineLoadStmt.getTypeName()); switch (type) { @@ -173,7 +174,7 @@ public void addRoutineLoadJob(RoutineLoadJob routineLoadJob) throws DdlException // check if db.routineLoadName has been used if (isNameUsed(routineLoadJob.getDbId(), routineLoadJob.getName())) { throw new DdlException("Name " + routineLoadJob.getName() + " already used in db " - + routineLoadJob.getDbId()); + + routineLoadJob.getDbId()); } unprotectedAddJob(routineLoadJob); @@ -187,23 +188,18 @@ public void addRoutineLoadJob(RoutineLoadJob routineLoadJob) throws DdlException private void unprotectedAddJob(RoutineLoadJob routineLoadJob) { idToRoutineLoadJob.put(routineLoadJob.getId(), routineLoadJob); - if (dbToNameToRoutineLoadJob.containsKey(routineLoadJob.getDbId())) { - Map> nameToRoutineLoadJob = dbToNameToRoutineLoadJob.get( - routineLoadJob.getDbId()); - if (nameToRoutineLoadJob.containsKey(routineLoadJob.getName())) { - nameToRoutineLoadJob.get(routineLoadJob.getName()).add(routineLoadJob); - } else { - List routineLoadJobList = Lists.newArrayList(); - routineLoadJobList.add(routineLoadJob); - nameToRoutineLoadJob.put(routineLoadJob.getName(), routineLoadJobList); - } - } else { - List routineLoadJobList = Lists.newArrayList(); - routineLoadJobList.add(routineLoadJob); - Map> nameToRoutineLoadJob = Maps.newConcurrentMap(); - nameToRoutineLoadJob.put(routineLoadJob.getName(), routineLoadJobList); + + Map> nameToRoutineLoadJob = dbToNameToRoutineLoadJob.get(routineLoadJob.getDbId()); + if (nameToRoutineLoadJob == null) { + nameToRoutineLoadJob = Maps.newConcurrentMap(); dbToNameToRoutineLoadJob.put(routineLoadJob.getDbId(), nameToRoutineLoadJob); } + List routineLoadJobList = nameToRoutineLoadJob.get(routineLoadJob.getName()); + if (routineLoadJobList == null) { + routineLoadJobList = Lists.newArrayList(); + nameToRoutineLoadJob.put(routineLoadJob.getName(), routineLoadJobList); + } + routineLoadJobList.add(routineLoadJob); // register txn state listener Catalog.getCurrentGlobalTransactionMgr().getListenerRegistry().register(routineLoadJob); } @@ -278,8 +274,7 @@ public void resumeRoutineLoadJob(ResumeRoutineLoadStmt resumeRoutineLoadStmt) th ConnectContext.get().getRemoteIP(), tableName); } - routineLoadJob.updateState(RoutineLoadJob.JobState.NEED_SCHEDULE, "user operation", - false /* not replay */); + routineLoadJob.updateState(RoutineLoadJob.JobState.NEED_SCHEDULE, "user operation", false /* not replay */); } public void stopRoutineLoadJob(StopRoutineLoadStmt stopRoutineLoadStmt) throws DdlException, AnalysisException { diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadProgress.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadProgress.java index eb3b593c258d47..0b0eb90fe9c87e 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadProgress.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadProgress.java @@ -17,9 +17,53 @@ package org.apache.doris.load.routineload; +import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + public abstract class RoutineLoadProgress implements Writable { + protected LoadDataSourceType loadDataSourceType; + protected boolean isTypeRead = false; + + public void setTypeRead(boolean isTypeRead) { + this.isTypeRead = isTypeRead; + } + + public RoutineLoadProgress(LoadDataSourceType loadDataSourceType) { + this.loadDataSourceType = loadDataSourceType; + } + abstract void update(RoutineLoadProgress progress); + + public static RoutineLoadProgress read(DataInput in) throws IOException { + RoutineLoadProgress progress = null; + LoadDataSourceType type = LoadDataSourceType.valueOf(Text.readString(in)); + if (type == LoadDataSourceType.KAFKA) { + progress = new KafkaProgress(); + } else { + throw new IOException("Unknown load data source type: " + type.name()); + } + + progress.setTypeRead(true); + progress.readFields(in); + return progress; + } + + @Override + public void write(DataOutput out) throws IOException { + // ATTN: must write type first + Text.writeString(out, loadDataSourceType.name()); + } + + @Override + public void readFields(DataInput in) throws IOException { + if (!isTypeRead) { + loadDataSourceType = LoadDataSourceType.valueOf(Text.readString(in)); + isTypeRead = true; + } + } } diff --git a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java index 4b0d1c416067fe..9f4a466d5400aa 100644 --- a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java +++ b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java @@ -143,7 +143,6 @@ public long beginTransaction(long dbId, String label, long timestamp, return txnLabels.get(label); } } - throw new LabelAlreadyUsedException(label); } if (runningTxnNums.get(dbId) != null @@ -364,7 +363,7 @@ public void commitTransaction(long dbId, long transactionId, List getReadyToPublishTransactions() { long dbId = transactionState.getDbId(); Database db = catalog.getDb(dbId); if (null == db) { - transactionState.setTransactionStatus(TransactionStatus.ABORTED, false /* not replay */); + transactionState.setTransactionStatus(TransactionStatus.ABORTED); unprotectUpsertTransactionState(transactionState); continue; } @@ -572,7 +571,7 @@ public void finishTransaction(long transactionId, Set errorReplicaIds) { if (db == null) { writeLock(); try { - transactionState.setTransactionStatus(TransactionStatus.ABORTED, false /* not replay */); + transactionState.setTransactionStatus(TransactionStatus.ABORTED); transactionState.setReason("db is dropped"); LOG.warn("db is dropped during transaction, abort transaction {}", transactionState); unprotectUpsertTransactionState(transactionState); @@ -696,7 +695,7 @@ public void finishTransaction(long transactionId, Set errorReplicaIds) { try { transactionState.setErrorReplicas(errorReplicaIds); transactionState.setFinishTime(System.currentTimeMillis()); - transactionState.setTransactionStatus(TransactionStatus.VISIBLE, false /* not replay */); + transactionState.setTransactionStatus(TransactionStatus.VISIBLE); unprotectUpsertTransactionState(transactionState); } catch (TransactionException e) { LOG.warn("failed to change transaction {} status to visible", transactionState.getTransactionId()); @@ -795,7 +794,7 @@ public void removeOldTransactions() { || !checkTxnHasRelatedJob(transactionState, dbIdToTxnIds)) { try { transactionState.setTransactionStatus(TransactionStatus.ABORTED, - TransactionState.TxnStatusChangeReason.TIMEOUT.name(), false /* not replay */); + TransactionState.TxnStatusChangeReason.TIMEOUT.name()); } catch (TransactionException e) { LOG.warn("txn {} could not be aborted with error message {}", transactionState.getTransactionId(), e.getMessage()); @@ -896,7 +895,7 @@ private void unprotectAbortTransaction(long transactionId, String reason, TxnCom } transactionState.setFinishTime(System.currentTimeMillis()); transactionState.setReason(reason); - transactionState.setTransactionStatus(TransactionStatus.ABORTED, reason, false /* not replay */); + transactionState.setTransactionStatus(TransactionStatus.ABORTED, reason); unprotectUpsertTransactionState(transactionState); for (PublishVersionTask task : transactionState.getPublishVersionTasks().values()) { AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.PUBLISH_VERSION, task.getSignature()); @@ -909,7 +908,7 @@ public void replayUpsertTransactionState(TransactionState transactionState) { writeLock(); try { // set transaction status will call txn state change listener - transactionState.setTransactionStatus(transactionState.getTransactionStatus(), true /* is replay */); + transactionState.replaySetTransactionStatus(); Database db = catalog.getDb(transactionState.getDbId()); if (transactionState.getTransactionStatus() == TransactionStatus.COMMITTED) { LOG.debug("replay a committed transaction {}", transactionState); @@ -923,9 +922,6 @@ public void replayUpsertTransactionState(TransactionState transactionState) { updateTxnLabels(transactionState); updateDBRunningTxnNum(preTxnState == null ? null : preTxnState.getTransactionStatus(), transactionState); - } catch (TransactionException e) { - // should not happen - throw new RuntimeException(e); } finally { writeUnlock(); } diff --git a/fe/src/main/java/org/apache/doris/transaction/TransactionState.java b/fe/src/main/java/org/apache/doris/transaction/TransactionState.java index 0292cde4fbe006..cfd3cebfed3423 100644 --- a/fe/src/main/java/org/apache/doris/transaction/TransactionState.java +++ b/fe/src/main/java/org/apache/doris/transaction/TransactionState.java @@ -19,10 +19,12 @@ import org.apache.doris.catalog.Catalog; import org.apache.doris.common.Config; +import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.metric.MetricRepo; import org.apache.doris.task.PublishVersionTask; +import org.apache.doris.transaction.TxnStateChangeListener.ListenResult; import com.google.common.base.Joiner; import com.google.common.collect.Maps; @@ -135,6 +137,10 @@ public String toString() { private long listenerId; + // the result of calling txn state change listener. + // this is used for replaying + private ListenResult listenResult = ListenResult.UNCHANGED; + // optional private TxnCommitAttachment txnCommitAttachment; @@ -261,56 +267,67 @@ public TxnCommitAttachment getTxnCommitAttachment() { return txnCommitAttachment; } - public void setTransactionStatus(TransactionStatus transactionStatus, boolean isReplay) + public void setTransactionStatus(TransactionStatus transactionStatus) throws TransactionException { - setTransactionStatus(transactionStatus, null, isReplay); + setTransactionStatus(transactionStatus, null); } - public void setTransactionStatus(TransactionStatus transactionStatus, String txnStatusChangeReason, - boolean isReplay) throws TransactionException { - // before state changed + public void setTransactionStatus(TransactionStatus transactionStatus, String txnStatusChangeReason) + throws TransactionException { + // before status changed TxnStateChangeListener listener = Catalog.getCurrentGlobalTransactionMgr().getListenerRegistry().getListener(listenerId); - if (!isReplay) { - if (listener != null) { - switch (transactionStatus) { - case ABORTED: - listener.beforeAborted(this, txnStatusChangeReason); - break; - case COMMITTED: - listener.beforeCommitted(this); - default: - break; - } + if (listener != null) { + switch (transactionStatus) { + case ABORTED: + listener.beforeAborted(this, txnStatusChangeReason); + break; + case COMMITTED: + listener.beforeCommitted(this); + default: + break; } } + + // status changed + this.preStatus = this.transactionStatus; + this.transactionStatus = transactionStatus; - // if is replay, the status is already be set - if (!isReplay) { - // state changed - this.preStatus = this.transactionStatus; - this.transactionStatus = transactionStatus; - } - - // after state changed + // after status changed if (transactionStatus == TransactionStatus.VISIBLE) { - if (!isReplay) { - this.latch.countDown(); - if (MetricRepo.isInit.get()) { - MetricRepo.COUNTER_TXN_SUCCESS.increase(1L); - } + this.latch.countDown(); + if (MetricRepo.isInit.get()) { + MetricRepo.COUNTER_TXN_SUCCESS.increase(1L); } } else if (transactionStatus == TransactionStatus.ABORTED) { if (MetricRepo.isInit.get()) { MetricRepo.COUNTER_TXN_FAILED.increase(1L); } if (listener != null) { - listener.onAborted(this, txnStatusChangeReason, isReplay); + listenResult = listener.onAborted(this, txnStatusChangeReason); } } else if (transactionStatus == TransactionStatus.COMMITTED && listener != null) { - listener.onCommitted(this, isReplay); + listenResult = listener.onCommitted(this); } } + public void replaySetTransactionStatus() { + // no need to set status, status is already set + // here we only care about listener callback + if (listenResult == ListenResult.UNCHANGED) { + return; + } + + TxnStateChangeListener listener = Catalog.getCurrentGlobalTransactionMgr().getListenerRegistry().getListener( + listenerId); + if (listener != null) { + if (transactionStatus == TransactionStatus.ABORTED) { + listener.replayOnAborted(this); + } else if (transactionStatus == TransactionStatus.COMMITTED) { + listener.replayOnCommitted(this); + } + } + } + public void waitTransactionVisible(long timeoutMillis) throws InterruptedException { this.latch.await(timeoutMillis, TimeUnit.MILLISECONDS); } @@ -415,15 +432,14 @@ public void write(DataOutput out) throws IOException { for (long errorReplciaId : errorReplicas) { out.writeLong(errorReplciaId); } - // TODO(ml): persistent will be enable after all of routine load work finished. -// if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_46) { -// if (txnCommitAttachment == null) { -// out.writeBoolean(false); -// } else { -// out.writeBoolean(true); -// txnCommitAttachment.write(out); -// } -// } + + if (txnCommitAttachment == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + txnCommitAttachment.write(out); + } + Text.writeString(out, listenResult.name()); } @Override @@ -448,13 +464,12 @@ public void readFields(DataInput in) throws IOException { for (int i = 0; i < errorReplicaNum; ++i) { errorReplicas.add(in.readLong()); } - // TODO(ml): persistent will be enable after all of routine load work finished. -// if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_46) { -// if (in.readBoolean()) { -// txnCommitAttachment = TransactionStateExtra.readTxnCommitAttachment(in, sourceType); -// } -// } - - // TODO(ml): reload txnStateChangeListener by txnCommitAttachment + + if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_49) { + if (in.readBoolean()) { + txnCommitAttachment = TxnCommitAttachment.read(in); + } + listenResult = ListenResult.valueOf(Text.readString(in)); + } } } diff --git a/fe/src/main/java/org/apache/doris/transaction/TxnCommitAttachment.java b/fe/src/main/java/org/apache/doris/transaction/TxnCommitAttachment.java index 784ee122512618..206ea0959b2a02 100644 --- a/fe/src/main/java/org/apache/doris/transaction/TxnCommitAttachment.java +++ b/fe/src/main/java/org/apache/doris/transaction/TxnCommitAttachment.java @@ -17,15 +17,29 @@ package org.apache.doris.transaction; +import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.load.routineload.RLTaskTxnCommitAttachment; import org.apache.doris.thrift.TTxnCommitAttachment; +import org.apache.doris.transaction.TransactionState.LoadJobSourceType; import java.io.DataInput; +import java.io.DataOutput; import java.io.IOException; public abstract class TxnCommitAttachment implements Writable { + protected TransactionState.LoadJobSourceType sourceType; + protected boolean isTypeRead = false; + + public TxnCommitAttachment(TransactionState.LoadJobSourceType sourceType) { + this.sourceType = sourceType; + } + + public void setTypeRead(boolean isTypeRead) { + this.isTypeRead = isTypeRead; + } + public static TxnCommitAttachment readTxnCommitAttachment(DataInput in, TransactionState.LoadJobSourceType sourceType) throws IOException { @@ -51,4 +65,32 @@ public static TxnCommitAttachment fromThrift(TTxnCommitAttachment txnCommitAttac return null; } } + + public static TxnCommitAttachment read(DataInput in) throws IOException { + TxnCommitAttachment attachment = null; + LoadJobSourceType type = LoadJobSourceType.valueOf(Text.readString(in)); + if (type == LoadJobSourceType.ROUTINE_LOAD_TASK) { + attachment = new RLTaskTxnCommitAttachment(); + } else { + throw new IOException("Unknown load job source type: " + type.name()); + } + + attachment.setTypeRead(true); + attachment.readFields(in); + return attachment; + } + + @Override + public void write(DataOutput out) throws IOException { + // ATTN: must write type first + Text.writeString(out, sourceType.name()); + } + + @Override + public void readFields(DataInput in) throws IOException { + if (!isTypeRead) { + sourceType = LoadJobSourceType.valueOf(Text.readString(in)); + isTypeRead = true; + } + } } diff --git a/fe/src/main/java/org/apache/doris/transaction/TxnStateChangeListener.java b/fe/src/main/java/org/apache/doris/transaction/TxnStateChangeListener.java index 3c80ddd7d263e2..e286d8a951c0d4 100644 --- a/fe/src/main/java/org/apache/doris/transaction/TxnStateChangeListener.java +++ b/fe/src/main/java/org/apache/doris/transaction/TxnStateChangeListener.java @@ -17,26 +17,24 @@ package org.apache.doris.transaction; -public abstract class TxnStateChangeListener { +public interface TxnStateChangeListener { - protected long id; - - public long getId() { - return id; + public enum ListenResult { + CHANGED, UNCHANGED } - public TxnStateChangeListener(long id) { - this.id = id; - } + public long getId(); - public abstract void beforeCommitted(TransactionState txnState) throws TransactionException; + public void beforeCommitted(TransactionState txnState) throws TransactionException; /** * update catalog of job which has related txn after transaction has been committed * * @param txnState */ - public abstract void onCommitted(TransactionState txnState, boolean isReplay) throws TransactionException; + public ListenResult onCommitted(TransactionState txnState) throws TransactionException; + + public void replayOnCommitted(TransactionState txnState); /** * this interface is executed before txn aborted, you can check if txn could be abort in this stage @@ -46,14 +44,18 @@ public TxnStateChangeListener(long id) { * @throws AbortTransactionException if transaction could not be abort or there are some exception before aborted, * it will throw this exception */ - public abstract void beforeAborted(TransactionState txnState, String txnStatusChangeReason) + public void beforeAborted(TransactionState txnState, String txnStatusChangeReason) throws AbortTransactionException; /** * this interface is executed when transaction has been aborted * * @param txnState - * @param txnStatusChangeReason maybe null + * @param txnStatusChangeReason + * maybe null + * @return */ - public abstract void onAborted(TransactionState txnState, String txnStatusChangeReason, boolean isReplay); + public ListenResult onAborted(TransactionState txnState, String txnStatusChangeReason); + + public void replayOnAborted(TransactionState txnState); } diff --git a/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java b/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java index 30b37cfbcd1881..a7bc4da11da3ae 100644 --- a/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java +++ b/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java @@ -320,7 +320,7 @@ public void testCommitRoutineLoadTransaction(@Injectable TabletCommitInfo tablet List routineLoadTaskInfoList = Deencapsulation.getField(routineLoadJob, "routineLoadTaskInfoList"); routineLoadTaskInfoList.add(routineLoadTaskInfo); TransactionState transactionState = new TransactionState(1L, 1L, "label", 1L, LoadJobSourceType.ROUTINE_LOAD_TASK, "be1"); - transactionState.setTransactionStatus(TransactionStatus.PREPARE, false); + transactionState.setTransactionStatus(TransactionStatus.PREPARE); Deencapsulation.setField(transactionState, "txnStateChangeListener", routineLoadJob); Map idToTransactionState = Maps.newHashMap(); idToTransactionState.put(1L, transactionState); @@ -393,7 +393,7 @@ public void testCommitRoutineLoadTransactionWithErrorMax(@Injectable TabletCommi List routineLoadTaskInfoList = Deencapsulation.getField(routineLoadJob, "routineLoadTaskInfoList"); routineLoadTaskInfoList.add(routineLoadTaskInfo); TransactionState transactionState = new TransactionState(1L, 1L, "label", 1L, LoadJobSourceType.ROUTINE_LOAD_TASK, "be1"); - transactionState.setTransactionStatus(TransactionStatus.PREPARE, false); + transactionState.setTransactionStatus(TransactionStatus.PREPARE); Deencapsulation.setField(transactionState, "txnStateChangeListener", routineLoadJob); Map idToTransactionState = Maps.newHashMap(); idToTransactionState.put(1L, transactionState);