Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ private String increaseCounter(String key, String deltaValue) {
}

@Override
protected void executeReplayOnAborted(TransactionState txnState) {
protected void replayTxnAttachment(TransactionState txnState) {
if (txnState.getTxnCommitAttachment() == null) {
// The txn attachment maybe null when broker load has been cancelled without attachment.
// The end log of broker load has been record but the callback id of txnState hasn't been removed
Expand All @@ -494,11 +494,6 @@ protected void executeReplayOnAborted(TransactionState txnState) {
unprotectReadEndOperation((LoadJobFinalOperation) txnState.getTxnCommitAttachment());
}

@Override
protected void executeReplayOnVisible(TransactionState txnState) {
unprotectReadEndOperation((LoadJobFinalOperation) txnState.getTxnCommitAttachment());
}

@Override
public void write(DataOutput out) throws IOException {
super.write(out);
Expand Down
25 changes: 10 additions & 15 deletions fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,11 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
protected int progress;

// non-persistence
// This param is set true during txn is committing.
// During committing, the load job could not be cancelled.
protected boolean isCommitting = false;
// This param is set true in mini load.
// The streaming mini load could not be cancelled by frontend.
protected boolean isCancellable = true;

// only for persistence param
Expand Down Expand Up @@ -743,7 +747,7 @@ public void afterCommitted(TransactionState txnState, boolean txnOperated) throw
public void replayOnCommitted(TransactionState txnState) {
writeLock();
try {
isCommitting = true;
replayTxnAttachment(txnState);
} finally {
writeUnlock();
}
Expand All @@ -770,17 +774,14 @@ public void afterAborted(TransactionState txnState, boolean txnOperated, String
return;
}
// record attachment in load job
executeAfterAborted(txnState);
replayTxnAttachment(txnState);
// cancel load job
unprotectedExecuteCancel(new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, txnStatusChangeReason), false);
} finally {
writeUnlock();
}
}

protected void executeAfterAborted(TransactionState txnState) {
}

/**
* This method is used to replay the cancelled state of load job
*
Expand All @@ -790,7 +791,7 @@ protected void executeAfterAborted(TransactionState txnState) {
public void replayOnAborted(TransactionState txnState) {
writeLock();
try {
executeReplayOnAborted(txnState);
replayTxnAttachment(txnState);
failMsg = new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, txnState.getReason());
finishTimestamp = txnState.getFinishTime();
state = JobState.CANCELLED;
Expand All @@ -799,9 +800,6 @@ public void replayOnAborted(TransactionState txnState) {
}
}

protected void executeReplayOnAborted(TransactionState txnState) {
}

/**
* This method will finish the load job without edit log.
* The job will be finished by replayOnVisible when txn journal replay
Expand All @@ -814,18 +812,15 @@ public void afterVisible(TransactionState txnState, boolean txnOperated) {
if (!txnOperated) {
return;
}
executeAfterVisible(txnState);
replayTxnAttachment(txnState);
updateState(JobState.FINISHED);
}

protected void executeAfterVisible(TransactionState txnState) {
}

@Override
public void replayOnVisible(TransactionState txnState) {
writeLock();
try {
executeReplayOnVisible(txnState);
replayTxnAttachment(txnState);
progress = 100;
finishTimestamp = txnState.getFinishTime();
state = JobState.FINISHED;
Expand All @@ -834,7 +829,7 @@ public void replayOnVisible(TransactionState txnState) {
}
}

protected void executeReplayOnVisible(TransactionState txnState) {
protected void replayTxnAttachment(TransactionState txnState) {
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,12 @@ public void readFields(DataInput in) throws IOException {
map.put(loadJob.getLabel(), jobs);
}
jobs.add(loadJob);
// The callback of load job which is replayed by image need to be registered in callback factory.
// The commit and visible txn will callback the unfinished load job.
// Otherwise, the load job always does not be completed while the txn is visible.
if (!loadJob.isCompleted()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add some comment to explain this operation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should use another way to achieve this. When this function is called, Catalog is not ready. However you start to use Catalog's item, this is error-prone.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The callback need to be add in here. The reason is that the replay of txn maybe use the callback and reload the job state.

Catalog.getCurrentGlobalTransactionMgr().getCallbackFactory().addCallback(loadJob);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be done in prepareJobs();

}
}
}
}
17 changes: 1 addition & 16 deletions fe/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,22 +94,7 @@ public void beginTxn() throws LabelAlreadyUsedException, BeginTransactionExcepti
}

@Override
protected void executeAfterAborted(TransactionState txnState) {
updateLoadingStatue(txnState);
}

@Override
protected void executeAfterVisible(TransactionState txnState) {
updateLoadingStatue(txnState);
}

@Override
protected void executeReplayOnAborted(TransactionState txnState) {
updateLoadingStatue(txnState);
}

@Override
protected void executeReplayOnVisible(TransactionState txnState) {
protected void replayTxnAttachment(TransactionState txnState) {
updateLoadingStatue(txnState);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ public void testExecuteReplayOnAborted(@Injectable TransactionState txnState,
result = JobState.CANCELLED;
}
};
brokerLoadJob.executeReplayOnAborted(txnState);
brokerLoadJob.replayTxnAttachment(txnState);
Assert.assertEquals(99, (int) Deencapsulation.getField(brokerLoadJob, "progress"));
Assert.assertEquals(1, brokerLoadJob.getFinishTimestamp());
Assert.assertEquals(JobState.CANCELLED, brokerLoadJob.getState());
Expand All @@ -424,7 +424,7 @@ public void testExecuteReplayOnVisible(@Injectable TransactionState txnState,
result = JobState.LOADING;
}
};
brokerLoadJob.executeReplayOnAborted(txnState);
brokerLoadJob.replayTxnAttachment(txnState);
Assert.assertEquals(99, (int) Deencapsulation.getField(brokerLoadJob, "progress"));
Assert.assertEquals(1, brokerLoadJob.getFinishTimestamp());
Assert.assertEquals(JobState.LOADING, brokerLoadJob.getState());
Expand Down