diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java index e9f70bfab82eec..f051b6895db496 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java @@ -484,7 +484,7 @@ private String increaseCounter(String key, String deltaValue) { } @Override - protected void executeReplayOnAborted(TransactionState txnState) { + protected void replayTxnAttachment(TransactionState txnState) { if (txnState.getTxnCommitAttachment() == null) { // The txn attachment maybe null when broker load has been cancelled without attachment. // The end log of broker load has been record but the callback id of txnState hasn't been removed @@ -494,11 +494,6 @@ protected void executeReplayOnAborted(TransactionState txnState) { unprotectReadEndOperation((LoadJobFinalOperation) txnState.getTxnCommitAttachment()); } - @Override - protected void executeReplayOnVisible(TransactionState txnState) { - unprotectReadEndOperation((LoadJobFinalOperation) txnState.getTxnCommitAttachment()); - } - @Override public void write(DataOutput out) throws IOException { super.write(out); diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java index 2ac879cda8c57b..55ea11aa4a79b1 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java @@ -115,7 +115,11 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements protected int progress; // non-persistence + // This param is set true during txn is committing. + // During committing, the load job could not be cancelled. protected boolean isCommitting = false; + // This param is set true in mini load. + // The streaming mini load could not be cancelled by frontend. protected boolean isCancellable = true; // only for persistence param @@ -743,7 +747,7 @@ public void afterCommitted(TransactionState txnState, boolean txnOperated) throw public void replayOnCommitted(TransactionState txnState) { writeLock(); try { - isCommitting = true; + replayTxnAttachment(txnState); } finally { writeUnlock(); } @@ -770,7 +774,7 @@ public void afterAborted(TransactionState txnState, boolean txnOperated, String return; } // record attachment in load job - executeAfterAborted(txnState); + replayTxnAttachment(txnState); // cancel load job unprotectedExecuteCancel(new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, txnStatusChangeReason), false); } finally { @@ -778,9 +782,6 @@ public void afterAborted(TransactionState txnState, boolean txnOperated, String } } - protected void executeAfterAborted(TransactionState txnState) { - } - /** * This method is used to replay the cancelled state of load job * @@ -790,7 +791,7 @@ protected void executeAfterAborted(TransactionState txnState) { public void replayOnAborted(TransactionState txnState) { writeLock(); try { - executeReplayOnAborted(txnState); + replayTxnAttachment(txnState); failMsg = new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, txnState.getReason()); finishTimestamp = txnState.getFinishTime(); state = JobState.CANCELLED; @@ -799,9 +800,6 @@ public void replayOnAborted(TransactionState txnState) { } } - protected void executeReplayOnAborted(TransactionState txnState) { - } - /** * This method will finish the load job without edit log. * The job will be finished by replayOnVisible when txn journal replay @@ -814,18 +812,15 @@ public void afterVisible(TransactionState txnState, boolean txnOperated) { if (!txnOperated) { return; } - executeAfterVisible(txnState); + replayTxnAttachment(txnState); updateState(JobState.FINISHED); } - protected void executeAfterVisible(TransactionState txnState) { - } - @Override public void replayOnVisible(TransactionState txnState) { writeLock(); try { - executeReplayOnVisible(txnState); + replayTxnAttachment(txnState); progress = 100; finishTimestamp = txnState.getFinishTime(); state = JobState.FINISHED; @@ -834,7 +829,7 @@ public void replayOnVisible(TransactionState txnState) { } } - protected void executeReplayOnVisible(TransactionState txnState) { + protected void replayTxnAttachment(TransactionState txnState) { } @Override diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java index cb6d95bc741c75..9e4204815be5fb 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java @@ -602,6 +602,12 @@ public void readFields(DataInput in) throws IOException { map.put(loadJob.getLabel(), jobs); } jobs.add(loadJob); + // The callback of load job which is replayed by image need to be registered in callback factory. + // The commit and visible txn will callback the unfinished load job. + // Otherwise, the load job always does not be completed while the txn is visible. + if (!loadJob.isCompleted()) { + Catalog.getCurrentGlobalTransactionMgr().getCallbackFactory().addCallback(loadJob); + } } } } diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java index bc75ec860a2f25..6566209d841f8d 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java @@ -94,22 +94,7 @@ public void beginTxn() throws LabelAlreadyUsedException, BeginTransactionExcepti } @Override - protected void executeAfterAborted(TransactionState txnState) { - updateLoadingStatue(txnState); - } - - @Override - protected void executeAfterVisible(TransactionState txnState) { - updateLoadingStatue(txnState); - } - - @Override - protected void executeReplayOnAborted(TransactionState txnState) { - updateLoadingStatue(txnState); - } - - @Override - protected void executeReplayOnVisible(TransactionState txnState) { + protected void replayTxnAttachment(TransactionState txnState) { updateLoadingStatue(txnState); } diff --git a/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java b/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java index dec2c6b0331953..9fe22be6b106cc 100644 --- a/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java +++ b/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java @@ -398,7 +398,7 @@ public void testExecuteReplayOnAborted(@Injectable TransactionState txnState, result = JobState.CANCELLED; } }; - brokerLoadJob.executeReplayOnAborted(txnState); + brokerLoadJob.replayTxnAttachment(txnState); Assert.assertEquals(99, (int) Deencapsulation.getField(brokerLoadJob, "progress")); Assert.assertEquals(1, brokerLoadJob.getFinishTimestamp()); Assert.assertEquals(JobState.CANCELLED, brokerLoadJob.getState()); @@ -424,7 +424,7 @@ public void testExecuteReplayOnVisible(@Injectable TransactionState txnState, result = JobState.LOADING; } }; - brokerLoadJob.executeReplayOnAborted(txnState); + brokerLoadJob.replayTxnAttachment(txnState); Assert.assertEquals(99, (int) Deencapsulation.getField(brokerLoadJob, "progress")); Assert.assertEquals(1, brokerLoadJob.getFinishTimestamp()); Assert.assertEquals(JobState.LOADING, brokerLoadJob.getState());