From 2eaef5a015b9ead8b39ce48bad65913ae90f2b73 Mon Sep 17 00:00:00 2001
From: laihui <1353307710@qq.com>
Date: Tue, 10 Sep 2024 18:51:21 +0800
Subject: [PATCH] fix routine load job stuck if commit transaction failed
---
.../transaction/CloudGlobalTransactionMgr.java | 18 +++++++++++++++++-
.../doris/load/routineload/RoutineLoadJob.java | 2 +-
2 files changed, 18 insertions(+), 2 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
index f51454ad269c51..ffec2b18f015ec 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
@@ -505,7 +505,17 @@ private void commitTransaction(long dbId, List
tableList, long transactio
}
final CommitTxnRequest commitTxnRequest = builder.build();
- commitTxn(commitTxnRequest, transactionId, is2PC, dbId, tableList);
+ try {
+ commitTxn(commitTxnRequest, transactionId, is2PC, dbId, tableList);
+ } catch (UserException e) {
+ // For routine load, it is necessary to release the write lock when commit transaction fails,
+ // otherwise it will cause the lock added in beforeCommitted to not be released.
+ if (txnCommitAttachment != null && txnCommitAttachment instanceof RLTaskTxnCommitAttachment) {
+ RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = (RLTaskTxnCommitAttachment) txnCommitAttachment;
+ Env.getCurrentEnv().getRoutineLoadManager().getJob(rlTaskTxnCommitAttachment.getJobId()).writeUnlock();
+ }
+ throw e;
+ }
}
private void commitTxn(CommitTxnRequest commitTxnRequest, long transactionId, boolean is2PC, long dbId,
@@ -1021,6 +1031,12 @@ public void abortTransaction(Long dbId, Long transactionId, String reason,
Preconditions.checkNotNull(abortTxnResponse.getStatus());
} catch (RpcException e) {
LOG.warn("abortTxn failed, transactionId:{}, Exception", transactionId, e);
+ // For routine load, it is necessary to release the write lock when abort transaction fails,
+ // otherwise it will cause the lock added in beforeAborted to not be released.
+ if (txnCommitAttachment != null && txnCommitAttachment instanceof RLTaskTxnCommitAttachment) {
+ RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = (RLTaskTxnCommitAttachment) txnCommitAttachment;
+ Env.getCurrentEnv().getRoutineLoadManager().getJob(rlTaskTxnCommitAttachment.getJobId()).writeUnlock();
+ }
throw new UserException("abortTxn failed, errMsg:" + e.getMessage());
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index 8e3ed8c4682d44..ac4a548c62f897 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -485,7 +485,7 @@ protected void writeLock() {
lock.writeLock().lock();
}
- protected void writeUnlock() {
+ public void writeUnlock() {
lock.writeLock().unlock();
}