diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
index 849a05f2c15b52..2b564e385c5b15 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
@@ -354,6 +354,9 @@ public void commitTransaction(long dbId, List
tableList, long transaction
tableList.stream().map(Table::getId).collect(Collectors.toList()));
Map> backendToPartitionInfos = null;
if (!mowTableList.isEmpty()) {
+ if (!checkTransactionStateBeforeCommit(dbId, transactionId)) {
+ return;
+ }
DeleteBitmapUpdateLockContext lockContext = new DeleteBitmapUpdateLockContext();
getDeleteBitmapUpdateLock(transactionId, mowTableList, tabletCommitInfos, lockContext);
if (lockContext.getBackendToPartitionTablets().isEmpty()) {
@@ -374,6 +377,40 @@ public void commitTransaction(long dbId, List tableList, long transaction
}
}
+ private boolean checkTransactionStateBeforeCommit(long dbId, long transactionId)
+ throws TransactionCommitFailedException {
+ // if this txn has been calculated by previously task but commit rpc is timeout,
+ // be will send another commit request to fe, so if txn is committed or visible,
+ // no need to calculate delete bitmap again, just return ok to be to finish this commit.
+ TransactionState transactionState = Env.getCurrentGlobalTransactionMgr()
+ .getTransactionState(dbId, transactionId);
+ if (transactionState == null) {
+ throw new TransactionCommitFailedException("txn does not exist: " + transactionId);
+ }
+ if (null != transactionState.getTransactionStatus()) {
+ if (transactionState.getTransactionStatus() == TransactionStatus.ABORTED) {
+ throw new TransactionCommitFailedException("transaction [" + transactionId
+ + "] is already aborted. abort reason: " + transactionState.getReason());
+ } else if (transactionState.getTransactionStatus() == TransactionStatus.COMMITTED
+ || transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) {
+ LOG.info("txn={}, status={} not need to calculate delete bitmap again, just return ",
+ transactionId,
+ transactionState.getTransactionStatus().toString());
+ return false;
+ } else if (transactionState.getTransactionStatus() == TransactionStatus.PREPARE) {
+ LOG.info("txn={}, status={} need to calculate delete bitmap", transactionId,
+ transactionState.getTransactionStatus().toString());
+ return true;
+ } else {
+ throw new TransactionCommitFailedException("transaction [" + transactionId
+ + "] status is: " + transactionState.getTransactionStatus().toString());
+ }
+ } else {
+ throw new TransactionCommitFailedException("transaction [" + transactionId
+ + "] status is null ");
+ }
+ }
+
/**
* Post process of commitTxn
* 1. update some stats
@@ -499,23 +536,6 @@ private void commitTransactionWithoutLock(long dbId, List tableList, long
}
if (!mowTableList.isEmpty()) {
- // may be this txn has been calculated by previously task but commit rpc is timeout,
- // and be will send another commit request to fe, so need to check txn status first
- // before sending delete bitmap task to be, if txn is committed or visible, no need to
- // calculate delete bitmap again, just return ok to be to finish this commit.
- TransactionState transactionState = Env.getCurrentGlobalTransactionMgr()
- .getTransactionState(dbId, transactionId);
- if (null != transactionState && null != transactionState.getTransactionStatus()) {
- if (transactionState.getTransactionStatus() == TransactionStatus.COMMITTED
- || transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) {
- LOG.info("txn={}, status={} not need to calculate delete bitmap again, just return ", transactionId,
- transactionState.getTransactionStatus().toString());
- return;
- } else {
- LOG.info("txn={}, status={} need to calculate delete bitmap", transactionId,
- transactionState.getTransactionStatus().toString());
- }
- }
sendCalcDeleteBitmaptask(dbId, transactionId, backendToPartitionInfos);
}
@@ -1162,6 +1182,11 @@ public boolean commitAndPublishTransaction(DatabaseIf db, List tableList,
public void commitTransaction2PC(Database db, List tableList, long transactionId, long timeoutMillis)
throws UserException {
List mowTableList = getMowTableList(tableList, null);
+ if (!mowTableList.isEmpty()) {
+ if (!checkTransactionStateBeforeCommit(db.getId(), transactionId)) {
+ return;
+ }
+ }
commitTransactionWithoutLock(db.getId(), tableList, transactionId, null, null, true, mowTableList, null);
}
diff --git a/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_commit_fail.groovy b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_commit_fail.groovy
index fa71c3644f2027..0ab20324d7280f 100644
--- a/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_commit_fail.groovy
+++ b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_commit_fail.groovy
@@ -16,6 +16,9 @@
// under the License.
suite("test_cloud_mow_stream_load_with_commit_fail", "nonConcurrent") {
+ if (!isCloudMode()) {
+ return
+ }
GetDebugPoint().clearDebugPointsForAllFEs()
def backendId_to_backendIP = [:]
diff --git a/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_timeout.groovy b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_timeout.groovy
index 7176aec702f411..8ffc04dd735b95 100644
--- a/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_timeout.groovy
+++ b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_timeout.groovy
@@ -180,6 +180,26 @@ suite("test_cloud_mow_stream_load_with_timeout", "nonConcurrent") {
}
}
qt_sql """ select * from ${tableName} order by id"""
+ def now = System.currentTimeMillis()
+ GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.commitTransaction.timeout")
+ streamLoad {
+ table "${tableName}"
+
+ set 'column_separator', ','
+ set 'columns', 'id, name, score'
+ file "test_stream_load.csv"
+
+ time 10000 // limit inflight 10s
+
+ check { result, exception, startTime, endTime ->
+ log.info("Stream load result: ${result}")
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+
+ }
+ }
+ def time_diff = System.currentTimeMillis() - now
+ assertTrue(time_diff < 10000)
} finally {
GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.commitTransaction.timeout")
sql "DROP TABLE IF EXISTS ${tableName};"