From 9b824ff81702f020cae0a8b503eab3a760609dc7 Mon Sep 17 00:00:00 2001 From: emmymiao87 <522274284@qq.com> Date: Tue, 15 Oct 2019 19:54:23 +0800 Subject: [PATCH 1/6] [Bug-fix][Broker-load] Fix the bug of the label already exists when the txn has been finished If FE is restarted between txn committed and visible, the load job will be rescheduled and failed with label already exists. The reason is that there are inconsistency between transaction of load job and meta of load job. So, the replay of the txn attachment need to be done in function replayOnCommitted. The load job state and progress is correct after that. --- .../doris/load/loadv2/BrokerLoadJob.java | 7 +------ .../org/apache/doris/load/loadv2/LoadJob.java | 21 ++++++------------- .../apache/doris/load/loadv2/MiniLoadJob.java | 17 +-------------- .../transaction/GlobalTransactionMgr.java | 5 +++++ .../doris/load/loadv2/BrokerLoadJobTest.java | 4 ++-- 5 files changed, 15 insertions(+), 39 deletions(-) diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java index e9f70bfab82eec..d2555cc80503bb 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java @@ -484,7 +484,7 @@ private String increaseCounter(String key, String deltaValue) { } @Override - protected void executeReplayOnAborted(TransactionState txnState) { + protected void executeReplayTxnAttachment(TransactionState txnState) { if (txnState.getTxnCommitAttachment() == null) { // The txn attachment maybe null when broker load has been cancelled without attachment. // The end log of broker load has been record but the callback id of txnState hasn't been removed @@ -494,11 +494,6 @@ protected void executeReplayOnAborted(TransactionState txnState) { unprotectReadEndOperation((LoadJobFinalOperation) txnState.getTxnCommitAttachment()); } - @Override - protected void executeReplayOnVisible(TransactionState txnState) { - unprotectReadEndOperation((LoadJobFinalOperation) txnState.getTxnCommitAttachment()); - } - @Override public void write(DataOutput out) throws IOException { super.write(out); diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java index 2ac879cda8c57b..a7a24facb14654 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java @@ -743,7 +743,7 @@ public void afterCommitted(TransactionState txnState, boolean txnOperated) throw public void replayOnCommitted(TransactionState txnState) { writeLock(); try { - isCommitting = true; + executeReplayTxnAttachment(txnState); } finally { writeUnlock(); } @@ -770,7 +770,7 @@ public void afterAborted(TransactionState txnState, boolean txnOperated, String return; } // record attachment in load job - executeAfterAborted(txnState); + executeReplayTxnAttachment(txnState); // cancel load job unprotectedExecuteCancel(new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, txnStatusChangeReason), false); } finally { @@ -778,9 +778,6 @@ public void afterAborted(TransactionState txnState, boolean txnOperated, String } } - protected void executeAfterAborted(TransactionState txnState) { - } - /** * This method is used to replay the cancelled state of load job * @@ -790,7 +787,7 @@ protected void executeAfterAborted(TransactionState txnState) { public void replayOnAborted(TransactionState txnState) { writeLock(); try { - executeReplayOnAborted(txnState); + executeReplayTxnAttachment(txnState); failMsg = new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, txnState.getReason()); finishTimestamp = txnState.getFinishTime(); state = JobState.CANCELLED; @@ -799,9 +796,6 @@ public void replayOnAborted(TransactionState txnState) { } } - protected void executeReplayOnAborted(TransactionState txnState) { - } - /** * This method will finish the load job without edit log. * The job will be finished by replayOnVisible when txn journal replay @@ -814,18 +808,15 @@ public void afterVisible(TransactionState txnState, boolean txnOperated) { if (!txnOperated) { return; } - executeAfterVisible(txnState); + executeReplayTxnAttachment(txnState); updateState(JobState.FINISHED); } - protected void executeAfterVisible(TransactionState txnState) { - } - @Override public void replayOnVisible(TransactionState txnState) { writeLock(); try { - executeReplayOnVisible(txnState); + executeReplayTxnAttachment(txnState); progress = 100; finishTimestamp = txnState.getFinishTime(); state = JobState.FINISHED; @@ -834,7 +825,7 @@ public void replayOnVisible(TransactionState txnState) { } } - protected void executeReplayOnVisible(TransactionState txnState) { + protected void executeReplayTxnAttachment(TransactionState txnState) { } @Override diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java index bc75ec860a2f25..ab24c1f541865f 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java @@ -94,22 +94,7 @@ public void beginTxn() throws LabelAlreadyUsedException, BeginTransactionExcepti } @Override - protected void executeAfterAborted(TransactionState txnState) { - updateLoadingStatue(txnState); - } - - @Override - protected void executeAfterVisible(TransactionState txnState) { - updateLoadingStatue(txnState); - } - - @Override - protected void executeReplayOnAborted(TransactionState txnState) { - updateLoadingStatue(txnState); - } - - @Override - protected void executeReplayOnVisible(TransactionState txnState) { + protected void executeReplayTxnAttachment(TransactionState txnState) { updateLoadingStatue(txnState); } diff --git a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java index ad88468a4467e7..0839066a1971fa 100644 --- a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java +++ b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java @@ -602,6 +602,11 @@ public List getReadyToPublishTransactions() throws UserExcepti * @return */ public void finishTransaction(long transactionId, Set errorReplicaIds) throws UserException { + try { + Thread.sleep(60000); + } catch (InterruptedException e) { + e.printStackTrace(); + } TransactionState transactionState = idToTransactionState.get(transactionId); // add all commit errors and publish errors to a single set if (errorReplicaIds == null) { diff --git a/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java b/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java index dec2c6b0331953..9e36ad9c9db2f3 100644 --- a/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java +++ b/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java @@ -398,7 +398,7 @@ public void testExecuteReplayOnAborted(@Injectable TransactionState txnState, result = JobState.CANCELLED; } }; - brokerLoadJob.executeReplayOnAborted(txnState); + brokerLoadJob.executeReplayTxnAttachment(txnState); Assert.assertEquals(99, (int) Deencapsulation.getField(brokerLoadJob, "progress")); Assert.assertEquals(1, brokerLoadJob.getFinishTimestamp()); Assert.assertEquals(JobState.CANCELLED, brokerLoadJob.getState()); @@ -424,7 +424,7 @@ public void testExecuteReplayOnVisible(@Injectable TransactionState txnState, result = JobState.LOADING; } }; - brokerLoadJob.executeReplayOnAborted(txnState); + brokerLoadJob.executeReplayTxnAttachment(txnState); Assert.assertEquals(99, (int) Deencapsulation.getField(brokerLoadJob, "progress")); Assert.assertEquals(1, brokerLoadJob.getFinishTimestamp()); Assert.assertEquals(JobState.LOADING, brokerLoadJob.getState()); From 7d2796c71a6e353061707d2b802014bdd9b7ad0e Mon Sep 17 00:00:00 2001 From: emmymiao87 <522274284@qq.com> Date: Tue, 15 Oct 2019 21:25:53 +0800 Subject: [PATCH 2/6] Add log --- .../java/org/apache/doris/load/loadv2/BrokerLoadJob.java | 3 +++ .../main/java/org/apache/doris/load/loadv2/LoadJob.java | 8 ++++++++ 2 files changed, 11 insertions(+) diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java index d2555cc80503bb..27b67c996f73ad 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java @@ -492,6 +492,9 @@ protected void executeReplayTxnAttachment(TransactionState txnState) { return; } unprotectReadEndOperation((LoadJobFinalOperation) txnState.getTxnCommitAttachment()); + LOG.debug(new LogBuilder(LogKey.LOAD_JOB, id) + .add("msg", "execute replay txn attachment") + .build()); } @Override diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java index a7a24facb14654..3ead48a4f1d230 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java @@ -744,6 +744,9 @@ public void replayOnCommitted(TransactionState txnState) { writeLock(); try { executeReplayTxnAttachment(txnState); + LOG.debug(new LogBuilder(LogKey.LOAD_JOB, id) + .add("msg", "replay on committed") + .build()); } finally { writeUnlock(); } @@ -820,6 +823,11 @@ public void replayOnVisible(TransactionState txnState) { progress = 100; finishTimestamp = txnState.getFinishTime(); state = JobState.FINISHED; + LOG.debug(new LogBuilder(LogKey.LOAD_JOB, id) + .add("state", state) + .add("progress", progress) + .add("msg", "replay on visible") + .build()); } finally { writeUnlock(); } From c1dea4ee47446623860765d57673b61ab4f07ff8 Mon Sep 17 00:00:00 2001 From: emmymiao87 <522274284@qq.com> Date: Tue, 15 Oct 2019 21:44:53 +0800 Subject: [PATCH 3/6] fix bug --- .../java/org/apache/doris/load/loadv2/BrokerLoadJob.java | 3 --- .../main/java/org/apache/doris/load/loadv2/LoadJob.java | 8 -------- .../java/org/apache/doris/load/loadv2/LoadManager.java | 3 +++ 3 files changed, 3 insertions(+), 11 deletions(-) diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java index 27b67c996f73ad..d2555cc80503bb 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java @@ -492,9 +492,6 @@ protected void executeReplayTxnAttachment(TransactionState txnState) { return; } unprotectReadEndOperation((LoadJobFinalOperation) txnState.getTxnCommitAttachment()); - LOG.debug(new LogBuilder(LogKey.LOAD_JOB, id) - .add("msg", "execute replay txn attachment") - .build()); } @Override diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java index 3ead48a4f1d230..a7a24facb14654 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java @@ -744,9 +744,6 @@ public void replayOnCommitted(TransactionState txnState) { writeLock(); try { executeReplayTxnAttachment(txnState); - LOG.debug(new LogBuilder(LogKey.LOAD_JOB, id) - .add("msg", "replay on committed") - .build()); } finally { writeUnlock(); } @@ -823,11 +820,6 @@ public void replayOnVisible(TransactionState txnState) { progress = 100; finishTimestamp = txnState.getFinishTime(); state = JobState.FINISHED; - LOG.debug(new LogBuilder(LogKey.LOAD_JOB, id) - .add("state", state) - .add("progress", progress) - .add("msg", "replay on visible") - .build()); } finally { writeUnlock(); } diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java index cb6d95bc741c75..5d965bbb755d4b 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java @@ -602,6 +602,9 @@ public void readFields(DataInput in) throws IOException { map.put(loadJob.getLabel(), jobs); } jobs.add(loadJob); + if (!loadJob.isCompleted()) { + Catalog.getCurrentGlobalTransactionMgr().getCallbackFactory().addCallback(loadJob); + } } } } From 65749df0ebd398330b2f9344463ef894f2e0890e Mon Sep 17 00:00:00 2001 From: emmymiao87 <522274284@qq.com> Date: Wed, 16 Oct 2019 10:25:23 +0800 Subject: [PATCH 4/6] Remove the sleep --- .../org/apache/doris/transaction/GlobalTransactionMgr.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java index 0839066a1971fa..ad88468a4467e7 100644 --- a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java +++ b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java @@ -602,11 +602,6 @@ public List getReadyToPublishTransactions() throws UserExcepti * @return */ public void finishTransaction(long transactionId, Set errorReplicaIds) throws UserException { - try { - Thread.sleep(60000); - } catch (InterruptedException e) { - e.printStackTrace(); - } TransactionState transactionState = idToTransactionState.get(transactionId); // add all commit errors and publish errors to a single set if (errorReplicaIds == null) { From e68abbb91217439a109b04552580ce83ce117b7e Mon Sep 17 00:00:00 2001 From: emmymiao87 <522274284@qq.com> Date: Wed, 16 Oct 2019 10:41:49 +0800 Subject: [PATCH 5/6] Add some comments --- .../org/apache/doris/load/loadv2/BrokerLoadJob.java | 2 +- .../java/org/apache/doris/load/loadv2/LoadJob.java | 12 ++++++------ .../org/apache/doris/load/loadv2/LoadManager.java | 3 +++ .../org/apache/doris/load/loadv2/MiniLoadJob.java | 2 +- .../apache/doris/load/loadv2/BrokerLoadJobTest.java | 4 ++-- 5 files changed, 13 insertions(+), 10 deletions(-) diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java index d2555cc80503bb..f051b6895db496 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java @@ -484,7 +484,7 @@ private String increaseCounter(String key, String deltaValue) { } @Override - protected void executeReplayTxnAttachment(TransactionState txnState) { + protected void replayTxnAttachment(TransactionState txnState) { if (txnState.getTxnCommitAttachment() == null) { // The txn attachment maybe null when broker load has been cancelled without attachment. // The end log of broker load has been record but the callback id of txnState hasn't been removed diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java index a7a24facb14654..eff3607cc4f6bb 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java @@ -743,7 +743,7 @@ public void afterCommitted(TransactionState txnState, boolean txnOperated) throw public void replayOnCommitted(TransactionState txnState) { writeLock(); try { - executeReplayTxnAttachment(txnState); + replayTxnAttachment(txnState); } finally { writeUnlock(); } @@ -770,7 +770,7 @@ public void afterAborted(TransactionState txnState, boolean txnOperated, String return; } // record attachment in load job - executeReplayTxnAttachment(txnState); + replayTxnAttachment(txnState); // cancel load job unprotectedExecuteCancel(new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, txnStatusChangeReason), false); } finally { @@ -787,7 +787,7 @@ public void afterAborted(TransactionState txnState, boolean txnOperated, String public void replayOnAborted(TransactionState txnState) { writeLock(); try { - executeReplayTxnAttachment(txnState); + replayTxnAttachment(txnState); failMsg = new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, txnState.getReason()); finishTimestamp = txnState.getFinishTime(); state = JobState.CANCELLED; @@ -808,7 +808,7 @@ public void afterVisible(TransactionState txnState, boolean txnOperated) { if (!txnOperated) { return; } - executeReplayTxnAttachment(txnState); + replayTxnAttachment(txnState); updateState(JobState.FINISHED); } @@ -816,7 +816,7 @@ public void afterVisible(TransactionState txnState, boolean txnOperated) { public void replayOnVisible(TransactionState txnState) { writeLock(); try { - executeReplayTxnAttachment(txnState); + replayTxnAttachment(txnState); progress = 100; finishTimestamp = txnState.getFinishTime(); state = JobState.FINISHED; @@ -825,7 +825,7 @@ public void replayOnVisible(TransactionState txnState) { } } - protected void executeReplayTxnAttachment(TransactionState txnState) { + protected void replayTxnAttachment(TransactionState txnState) { } @Override diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java index 5d965bbb755d4b..9e4204815be5fb 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java @@ -602,6 +602,9 @@ public void readFields(DataInput in) throws IOException { map.put(loadJob.getLabel(), jobs); } jobs.add(loadJob); + // The callback of load job which is replayed by image need to be registered in callback factory. + // The commit and visible txn will callback the unfinished load job. + // Otherwise, the load job always does not be completed while the txn is visible. if (!loadJob.isCompleted()) { Catalog.getCurrentGlobalTransactionMgr().getCallbackFactory().addCallback(loadJob); } diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java index ab24c1f541865f..6566209d841f8d 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java @@ -94,7 +94,7 @@ public void beginTxn() throws LabelAlreadyUsedException, BeginTransactionExcepti } @Override - protected void executeReplayTxnAttachment(TransactionState txnState) { + protected void replayTxnAttachment(TransactionState txnState) { updateLoadingStatue(txnState); } diff --git a/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java b/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java index 9e36ad9c9db2f3..9fe22be6b106cc 100644 --- a/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java +++ b/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java @@ -398,7 +398,7 @@ public void testExecuteReplayOnAborted(@Injectable TransactionState txnState, result = JobState.CANCELLED; } }; - brokerLoadJob.executeReplayTxnAttachment(txnState); + brokerLoadJob.replayTxnAttachment(txnState); Assert.assertEquals(99, (int) Deencapsulation.getField(brokerLoadJob, "progress")); Assert.assertEquals(1, brokerLoadJob.getFinishTimestamp()); Assert.assertEquals(JobState.CANCELLED, brokerLoadJob.getState()); @@ -424,7 +424,7 @@ public void testExecuteReplayOnVisible(@Injectable TransactionState txnState, result = JobState.LOADING; } }; - brokerLoadJob.executeReplayTxnAttachment(txnState); + brokerLoadJob.replayTxnAttachment(txnState); Assert.assertEquals(99, (int) Deencapsulation.getField(brokerLoadJob, "progress")); Assert.assertEquals(1, brokerLoadJob.getFinishTimestamp()); Assert.assertEquals(JobState.LOADING, brokerLoadJob.getState()); From 4c60657accc23307a759d6f2b352e350e8807a04 Mon Sep 17 00:00:00 2001 From: emmymiao87 <522274284@qq.com> Date: Wed, 16 Oct 2019 14:49:03 +0800 Subject: [PATCH 6/6] Add comment --- fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java index eff3607cc4f6bb..55ea11aa4a79b1 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java @@ -115,7 +115,11 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements protected int progress; // non-persistence + // This param is set true during txn is committing. + // During committing, the load job could not be cancelled. protected boolean isCommitting = false; + // This param is set true in mini load. + // The streaming mini load could not be cancelled by frontend. protected boolean isCancellable = true; // only for persistence param