From df4d7059f8b080cb30b5dd3742f284f5d94ff34c Mon Sep 17 00:00:00 2001 From: huanghaibin Date: Sat, 8 Feb 2025 17:35:22 +0800 Subject: [PATCH] [fix](cloud-mow) FE should release delete bitmap lock when calculating delete bitmap failed #45673 --- .../CloudGlobalTransactionMgr.java | 59 +++++++++++++------ ...ud_mow_stream_load_with_commit_fail.groovy | 3 + ..._cloud_mow_stream_load_with_timeout.groovy | 20 +++++++ 3 files changed, 65 insertions(+), 17 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java index 849a05f2c15b52..2b564e385c5b15 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java @@ -354,6 +354,9 @@ public void commitTransaction(long dbId, List tableList, long transaction tableList.stream().map(Table::getId).collect(Collectors.toList())); Map> backendToPartitionInfos = null; if (!mowTableList.isEmpty()) { + if (!checkTransactionStateBeforeCommit(dbId, transactionId)) { + return; + } DeleteBitmapUpdateLockContext lockContext = new DeleteBitmapUpdateLockContext(); getDeleteBitmapUpdateLock(transactionId, mowTableList, tabletCommitInfos, lockContext); if (lockContext.getBackendToPartitionTablets().isEmpty()) { @@ -374,6 +377,40 @@ public void commitTransaction(long dbId, List
tableList, long transaction } } + private boolean checkTransactionStateBeforeCommit(long dbId, long transactionId) + throws TransactionCommitFailedException { + // if this txn has been calculated by previously task but commit rpc is timeout, + // be will send another commit request to fe, so if txn is committed or visible, + // no need to calculate delete bitmap again, just return ok to be to finish this commit. + TransactionState transactionState = Env.getCurrentGlobalTransactionMgr() + .getTransactionState(dbId, transactionId); + if (transactionState == null) { + throw new TransactionCommitFailedException("txn does not exist: " + transactionId); + } + if (null != transactionState.getTransactionStatus()) { + if (transactionState.getTransactionStatus() == TransactionStatus.ABORTED) { + throw new TransactionCommitFailedException("transaction [" + transactionId + + "] is already aborted. abort reason: " + transactionState.getReason()); + } else if (transactionState.getTransactionStatus() == TransactionStatus.COMMITTED + || transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) { + LOG.info("txn={}, status={} not need to calculate delete bitmap again, just return ", + transactionId, + transactionState.getTransactionStatus().toString()); + return false; + } else if (transactionState.getTransactionStatus() == TransactionStatus.PREPARE) { + LOG.info("txn={}, status={} need to calculate delete bitmap", transactionId, + transactionState.getTransactionStatus().toString()); + return true; + } else { + throw new TransactionCommitFailedException("transaction [" + transactionId + + "] status is: " + transactionState.getTransactionStatus().toString()); + } + } else { + throw new TransactionCommitFailedException("transaction [" + transactionId + + "] status is null "); + } + } + /** * Post process of commitTxn * 1. update some stats @@ -499,23 +536,6 @@ private void commitTransactionWithoutLock(long dbId, List
tableList, long } if (!mowTableList.isEmpty()) { - // may be this txn has been calculated by previously task but commit rpc is timeout, - // and be will send another commit request to fe, so need to check txn status first - // before sending delete bitmap task to be, if txn is committed or visible, no need to - // calculate delete bitmap again, just return ok to be to finish this commit. - TransactionState transactionState = Env.getCurrentGlobalTransactionMgr() - .getTransactionState(dbId, transactionId); - if (null != transactionState && null != transactionState.getTransactionStatus()) { - if (transactionState.getTransactionStatus() == TransactionStatus.COMMITTED - || transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) { - LOG.info("txn={}, status={} not need to calculate delete bitmap again, just return ", transactionId, - transactionState.getTransactionStatus().toString()); - return; - } else { - LOG.info("txn={}, status={} need to calculate delete bitmap", transactionId, - transactionState.getTransactionStatus().toString()); - } - } sendCalcDeleteBitmaptask(dbId, transactionId, backendToPartitionInfos); } @@ -1162,6 +1182,11 @@ public boolean commitAndPublishTransaction(DatabaseIf db, List
tableList, public void commitTransaction2PC(Database db, List
tableList, long transactionId, long timeoutMillis) throws UserException { List mowTableList = getMowTableList(tableList, null); + if (!mowTableList.isEmpty()) { + if (!checkTransactionStateBeforeCommit(db.getId(), transactionId)) { + return; + } + } commitTransactionWithoutLock(db.getId(), tableList, transactionId, null, null, true, mowTableList, null); } diff --git a/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_commit_fail.groovy b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_commit_fail.groovy index fa71c3644f2027..0ab20324d7280f 100644 --- a/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_commit_fail.groovy +++ b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_commit_fail.groovy @@ -16,6 +16,9 @@ // under the License. suite("test_cloud_mow_stream_load_with_commit_fail", "nonConcurrent") { + if (!isCloudMode()) { + return + } GetDebugPoint().clearDebugPointsForAllFEs() def backendId_to_backendIP = [:] diff --git a/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_timeout.groovy b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_timeout.groovy index 7176aec702f411..8ffc04dd735b95 100644 --- a/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_timeout.groovy +++ b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_timeout.groovy @@ -180,6 +180,26 @@ suite("test_cloud_mow_stream_load_with_timeout", "nonConcurrent") { } } qt_sql """ select * from ${tableName} order by id""" + def now = System.currentTimeMillis() + GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.commitTransaction.timeout") + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'columns', 'id, name, score' + file "test_stream_load.csv" + + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + log.info("Stream load result: ${result}") + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + + } + } + def time_diff = System.currentTimeMillis() - now + assertTrue(time_diff < 10000) } finally { GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.commitTransaction.timeout") sql "DROP TABLE IF EXISTS ${tableName};"