diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
index e00a2211419896..b526c5886e023e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
@@ -338,14 +338,14 @@ public void preCommitTransaction2PC(Database db, List
tableList, long tra
@Deprecated
@Override
- public void commitTransaction(long dbId, List tableList,
+ public void commitTransactionWithoutLock(long dbId, List tableList,
long transactionId, List tabletCommitInfos)
throws UserException {
- commitTransaction(dbId, tableList, transactionId, tabletCommitInfos, null);
+ commitTransactionWithoutLock(dbId, tableList, transactionId, tabletCommitInfos, null);
}
@Override
- public void commitTransaction(long dbId, List tableList, long transactionId,
+ public void commitTransactionWithoutLock(long dbId, List tableList, long transactionId,
List tabletCommitInfos, TxnCommitAttachment txnCommitAttachment)
throws UserException {
List mowTableList = getMowTableList(tableList, tabletCommitInfos);
@@ -1186,6 +1186,15 @@ private List getTablesNeedCommitLock(List tableList) {
}
}
+ @Override
+ public void commitTransaction(DatabaseIf db, List tableList, long transactionId,
+ List tabletCommitInfos, long timeoutMillis, TxnCommitAttachment txnCommitAttachment)
+ throws UserException {
+ // There is no publish in cloud mode
+ commitAndPublishTransaction(
+ db, tableList, transactionId, tabletCommitInfos, timeoutMillis, txnCommitAttachment);
+ }
+
@Override
public boolean commitAndPublishTransaction(DatabaseIf db, List tableList, long transactionId,
List tabletCommitInfos, long timeoutMillis,
@@ -1234,7 +1243,7 @@ public boolean commitAndPublishTransaction(DatabaseIf db, List tableList,
}
try {
- commitTransaction(db.getId(), tableList, transactionId, tabletCommitInfos, txnCommitAttachment);
+ commitTransactionWithoutLock(db.getId(), tableList, transactionId, tabletCommitInfos, txnCommitAttachment);
} finally {
decreaseWaitingLockCount(tablesToLock);
MetaLockUtils.commitUnlockTables(tablesToLock);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
index 536a7267fdd90b..05d9d0c2accdeb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
@@ -361,7 +361,7 @@ private void onLoadingTaskFinished(BrokerLoadingTaskAttachment attachment) {
.add("txn_id", transactionId)
.add("msg", "Load job try to commit txn")
.build());
- Env.getCurrentGlobalTransactionMgr().commitTransaction(
+ Env.getCurrentGlobalTransactionMgr().commitTransactionWithoutLock(
dbId, tableList, transactionId, commitInfos, getLoadJobFinalOperation());
afterLoadingTaskCommitTransaction(tableList);
afterCommit();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
index f01f205e96dc0d..1abc8b76b544ea 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
@@ -671,7 +671,7 @@ private void tryCommitJob() throws UserException {
try {
LOG.info(new LogBuilder(LogKey.LOAD_JOB, id).add("txn_id", transactionId)
.add("msg", "Load job try to commit txn").build());
- Env.getCurrentGlobalTransactionMgr().commitTransaction(
+ Env.getCurrentGlobalTransactionMgr().commitTransactionWithoutLock(
dbId, tableList, transactionId, commitInfos,
new LoadJobFinalOperation(id, loadingStatus, progress, loadStartTimestamp,
finishTimestamp, state, failMsg));
diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 38d81d64bfa650..381ea3bbbf77c7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -1854,14 +1854,23 @@ private boolean commitTxnImpl(TCommitTxnRequest request) throws UserException {
subTxnInfo.getTabletCommitInfos(), null));
}
transactionState.setSubTxnIds(subTxnIds);
- return Env.getCurrentGlobalTransactionMgr().commitAndPublishTransaction(db, request.getTxnId(),
- subTransactionStates, timeoutMs);
- } else {
+ return Env.getCurrentGlobalTransactionMgr()
+ .commitAndPublishTransaction(db, request.getTxnId(),
+ subTransactionStates, timeoutMs);
+ } else if (!request.isOnlyCommit()) {
return Env.getCurrentGlobalTransactionMgr()
.commitAndPublishTransaction(db, tableList,
request.getTxnId(),
TabletCommitInfo.fromThrift(request.getCommitInfos()), timeoutMs,
TxnCommitAttachment.fromThrift(request.getTxnCommitAttachment()));
+ } else {
+ // single table commit, so don't need to wait for publish.
+ Env.getCurrentGlobalTransactionMgr()
+ .commitTransaction(db, tableList,
+ request.getTxnId(),
+ TabletCommitInfo.fromThrift(request.getCommitInfos()), timeoutMs,
+ TxnCommitAttachment.fromThrift(request.getTxnCommitAttachment()));
+ return true;
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
index ff1115bf0a6990..6552c65d412464 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
@@ -215,10 +215,11 @@ private void preCommitTransaction2PC(long dbId, List tableList, long tran
}
@Deprecated
- public void commitTransaction(long dbId, List tableList,
+ @Override
+ public void commitTransactionWithoutLock(long dbId, List tableList,
long transactionId, List tabletCommitInfos)
throws UserException {
- commitTransaction(dbId, tableList, transactionId, tabletCommitInfos, null);
+ commitTransactionWithoutLock(dbId, tableList, transactionId, tabletCommitInfos, null);
}
@Override
@@ -234,7 +235,8 @@ public void afterCommitTxnResp(CommitTxnResponse commitTxnResponse) {
* @note it is necessary to optimize the `lock` mechanism and `lock` scope resulting from wait lock long time
* @note callers should get all tables' write locks before call this api
*/
- public void commitTransaction(long dbId, List tableList, long transactionId,
+ @Override
+ public void commitTransactionWithoutLock(long dbId, List tableList, long transactionId,
List tabletCommitInfos, TxnCommitAttachment txnCommitAttachment)
throws UserException {
if (Config.disable_load_job) {
@@ -251,7 +253,7 @@ public void commitTransaction(long dbId, List tableList, long transaction
/**
* @note callers should get all tables' write locks before call this api
*/
- public void commitTransaction(long dbId, List tableList, long transactionId,
+ public void commitTransactionWithoutLock(long dbId, List tableList, long transactionId,
List subTransactionStates, long timeout) throws UserException {
if (Config.disable_load_job) {
throw new TransactionCommitFailedException("disable_load_job is set to true, all load jobs are prevented");
@@ -264,6 +266,21 @@ public void commitTransaction(long dbId, List tableList, long transaction
dbTransactionMgr.commitTransaction(transactionId, tableList, subTransactionStates);
}
+ @Override
+ public void commitTransaction(DatabaseIf db, List tableList, long transactionId,
+ List tabletCommitInfos, long timeoutMillis, TxnCommitAttachment txnCommitAttachment)
+ throws UserException {
+ if (!MetaLockUtils.tryWriteLockTablesOrMetaException(tableList, timeoutMillis, TimeUnit.MILLISECONDS)) {
+ throw new UserException("get tableList write lock timeout, tableList=("
+ + StringUtils.join(tableList, ",") + ")");
+ }
+ try {
+ commitTransactionWithoutLock(db.getId(), tableList, transactionId, tabletCommitInfos, txnCommitAttachment);
+ } finally {
+ MetaLockUtils.writeUnlockTables(tableList);
+ }
+ }
+
private void commitTransaction2PC(long dbId, long transactionId)
throws UserException {
if (Config.disable_load_job) {
@@ -291,7 +308,7 @@ public boolean commitAndPublishTransaction(DatabaseIf db, List tableList,
+ StringUtils.join(tableList, ",") + ")");
}
try {
- commitTransaction(db.getId(), tableList, transactionId, tabletCommitInfos, txnCommitAttachment);
+ commitTransactionWithoutLock(db.getId(), tableList, transactionId, tabletCommitInfos, txnCommitAttachment);
} finally {
MetaLockUtils.writeUnlockTables(tableList);
}
@@ -320,7 +337,7 @@ public boolean commitAndPublishTransaction(DatabaseIf db, long transactionId,
+ StringUtils.join(tableList, ",") + ")");
}
try {
- commitTransaction(db.getId(), tableList, transactionId, subTransactionStates, timeoutMillis);
+ commitTransactionWithoutLock(db.getId(), tableList, transactionId, subTransactionStates, timeoutMillis);
} finally {
MetaLockUtils.writeUnlockTables(tableList);
}
@@ -351,7 +368,7 @@ public void commitTransaction2PC(Database db, List tableList, long transa
}
stopWatch.stop();
LOG.info("stream load tasks are committed successfully. txns: {}. time cost: {} ms."
- + " data will be visable later.", transactionId, stopWatch.getTime());
+ + " data will be visible later.", transactionId, stopWatch.getTime());
}
@Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgrIface.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgrIface.java
index c8d436e68d8356..38c8cdd5a6d07e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgrIface.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgrIface.java
@@ -78,14 +78,19 @@ public void preCommitTransaction2PC(Database db, List tableList, long tra
throws UserException;
@Deprecated
- public void commitTransaction(long dbId, List tableList,
+ public void commitTransactionWithoutLock(long dbId, List tableList,
long transactionId, List tabletCommitInfos)
throws UserException;
- public void commitTransaction(long dbId, List tableList, long transactionId,
+ public void commitTransactionWithoutLock(long dbId, List tableList, long transactionId,
List tabletCommitInfos, TxnCommitAttachment txnCommitAttachment)
throws UserException;
+ public void commitTransaction(DatabaseIf db, List tableList, long transactionId,
+ List tabletCommitInfos, long timeoutMillis,
+ TxnCommitAttachment txnCommitAttachment)
+ throws UserException;
+
public boolean commitAndPublishTransaction(DatabaseIf db, List tableList, long transactionId,
List tabletCommitInfos, long timeoutMillis)
throws UserException;
diff --git a/fe/fe-core/src/test/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgrTest.java
index 2522e2487ac349..75648634e3beac 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgrTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgrTest.java
@@ -194,7 +194,7 @@ public Cloud.CommitTxnResponse commitTxn(Cloud.CommitTxnRequest request) {
long transactionId = 123533;
Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1),
+ masterTransMgr.commitTransactionWithoutLock(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1),
transactionId, null, null);
}
@@ -219,7 +219,7 @@ public Cloud.CommitTxnResponse commitTxn(Cloud.CommitTxnRequest request) {
long transactionId = 123533;
Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1),
+ masterTransMgr.commitTransactionWithoutLock(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1),
transactionId, null, null);
}
@@ -246,8 +246,8 @@ public Cloud.CommitTxnResponse commitTxn(Cloud.CommitTxnRequest request) {
long transactionId = 123533;
Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1),
- transactionId, null, null);
+ masterTransMgr.commitTransactionWithoutLock(
+ CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId, null, null);
});
}
@@ -278,7 +278,7 @@ public Cloud.CommitTxnResponse commitTxn(Cloud.CommitTxnRequest request) {
long transactionId = 123533;
Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1),
+ masterTransMgr.commitTransactionWithoutLock(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1),
transactionId, null, null);
}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java
index 18f29ac30d2110..bf1b82019eb7c8 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java
@@ -358,7 +358,8 @@ public Collection getTabletDeleteInfo() {
new Expectations(globalTransactionMgr) {
{
try {
- globalTransactionMgr.commitTransaction(anyLong, (List) any, anyLong, (List) any, (TxnCommitAttachment) any);
+ globalTransactionMgr.commitTransactionWithoutLock(
+ anyLong, (List) any, anyLong, (List) any, (TxnCommitAttachment) any);
} catch (UserException e) {
// CHECKSTYLE IGNORE THIS LINE
}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java
index e3916cfb18d791..f1e9942c7e311a 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java
@@ -377,7 +377,8 @@ public void testUpdateEtlStatusFinishedAndCommitTransaction(@Mocked Env env, @In
AgentTaskExecutor.submit((AgentBatchTask) any);
Env.getCurrentGlobalTransactionMgr();
result = transactionMgr;
- transactionMgr.commitTransaction(dbId, (List) any, transactionId, (List) any,
+ transactionMgr.commitTransactionWithoutLock(
+ dbId, (List) any, transactionId, (List) any,
(LoadJobFinalOperation) any);
}
};
diff --git a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
index 82c79e9f6f00de..365cf8051d82e9 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
@@ -144,7 +144,8 @@ public Map addTransactionToTransactionMgr() throws UserException {
CatalogTestUtil.testTabletId1, allBackends);
Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId1,
+ masterTransMgr.commitTransactionWithoutLock(
+ CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId1,
transTablets, null);
TransactionState transactionState1 = fakeEditLog.getTransaction(transactionId1);
Map> keyToSuccessTablets = new HashMap<>();
@@ -541,7 +542,8 @@ private void addSubTransaction() throws UserException {
setSuccessTablet(keyToSuccessTablets, allBackends, transactionId, CatalogTestUtil.testTabletId1, 14);
setSuccessTablet(keyToSuccessTablets, allBackends, subTxnId2, CatalogTestUtil.testTabletId2, 13);
setSuccessTablet(keyToSuccessTablets, allBackends, subTxnId4, CatalogTestUtil.testTabletId1, 15);
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(table1, table2, table1),
+ masterTransMgr.commitTransactionWithoutLock(
+ CatalogTestUtil.testDbId1, Lists.newArrayList(table1, table2, table1),
transactionState6.getTransactionId(),
GlobalTransactionMgrTest.generateSubTransactionStates(masterTransMgr, transactionState6,
subTransactionInfos), 300000);
@@ -577,7 +579,8 @@ private void addSubTransaction() throws UserException {
new SubTransactionInfo(table1, CatalogTestUtil.testTabletId1, allBackends, subTxnIds8.get(0)),
new SubTransactionInfo(table2, CatalogTestUtil.testTabletId2, allBackends, subTxnIds8.get(1)),
new SubTransactionInfo(table1, CatalogTestUtil.testTabletId1, allBackends, subTxnIds8.get(2)));
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(table1, table2),
+ masterTransMgr.commitTransactionWithoutLock(
+ CatalogTestUtil.testDbId1, Lists.newArrayList(table1, table2),
transactionState8.getTransactionId(),
GlobalTransactionMgrTest.generateSubTransactionStates(masterTransMgr, transactionState8,
subTransactionInfos), 300000);
diff --git a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
index c4ec468c651856..522021a97710d5 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
@@ -177,8 +177,8 @@ public void testCommitTransaction() throws UserException {
List transTablets = generateTabletCommitInfos(CatalogTestUtil.testTabletId1, allBackends);
Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId,
- transTablets, null);
+ masterTransMgr.commitTransactionWithoutLock(
+ CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId, transTablets, null);
TransactionState transactionState = fakeEditLog.getTransaction(transactionId);
// check status is committed
Assert.assertEquals(TransactionStatus.COMMITTED, transactionState.getTransactionStatus());
@@ -209,8 +209,8 @@ public void testCommitTransactionWithOneFailed() throws UserException {
List transTablets = generateTabletCommitInfos(CatalogTestUtil.testTabletId1,
Lists.newArrayList(CatalogTestUtil.testBackendId1, CatalogTestUtil.testBackendId2));
// commit txn
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId,
- transTablets, null);
+ masterTransMgr.commitTransactionWithoutLock(
+ CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId, transTablets, null);
checkVersion(testTable1, CatalogTestUtil.testPartition1, CatalogTestUtil.testIndexId1,
CatalogTestUtil.testTabletId1, CatalogTestUtil.testStartVersion,
CatalogTestUtil.testStartVersion + 2,
@@ -240,8 +240,8 @@ public void testCommitTransactionWithOneFailed() throws UserException {
List transTablets = generateTabletCommitInfos(CatalogTestUtil.testTabletId1,
Lists.newArrayList(CatalogTestUtil.testBackendId1, CatalogTestUtil.testBackendId3));
try {
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1),
- transactionId2, transTablets, null);
+ masterTransMgr.commitTransactionWithoutLock(
+ CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2, transTablets, null);
Assert.fail();
} catch (TabletQuorumFailedException e) {
TransactionState transactionState = masterTransMgr.getTransactionState(CatalogTestUtil.testDbId1, transactionId2);
@@ -260,8 +260,8 @@ public void testCommitTransactionWithOneFailed() throws UserException {
// txn3: commit the second transaction with 1,2,3 success
if (true) {
List transTablets = generateTabletCommitInfos(CatalogTestUtil.testTabletId1, allBackends);
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2,
- transTablets, null);
+ masterTransMgr.commitTransactionWithoutLock(
+ CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2, transTablets, null);
TransactionState transactionState = fakeEditLog.getTransaction(transactionId2);
// check status is committed
Assert.assertEquals(TransactionStatus.COMMITTED, transactionState.getTransactionStatus());
@@ -342,7 +342,8 @@ public void testCommitRoutineLoadTransaction(@Injectable TabletCommitInfo tablet
Deencapsulation.setField(masterTransMgr.getDatabaseTransactionMgr(CatalogTestUtil.testDbId1), "idToRunningTransactionState", idToTransactionState);
Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
- masterTransMgr.commitTransaction(1L, Lists.newArrayList(testTable1), 1L, transTablets, txnCommitAttachment);
+ masterTransMgr.commitTransactionWithoutLock(
+ 1L, Lists.newArrayList(testTable1), 1L, transTablets, txnCommitAttachment);
RoutineLoadStatistic jobStatistic = Deencapsulation.getField(routineLoadJob, "jobStatistic");
Assert.assertEquals(Long.valueOf(101), Deencapsulation.getField(jobStatistic, "currentTotalRows"));
@@ -407,7 +408,8 @@ public void testCommitRoutineLoadTransactionWithErrorMax(@Injectable TabletCommi
Deencapsulation.setField(masterTransMgr.getDatabaseTransactionMgr(CatalogTestUtil.testDbId1), "idToRunningTransactionState", idToTransactionState);
Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
- masterTransMgr.commitTransaction(1L, Lists.newArrayList(testTable1), 1L, transTablets, txnCommitAttachment);
+ masterTransMgr.commitTransactionWithoutLock(
+ 1L, Lists.newArrayList(testTable1), 1L, transTablets, txnCommitAttachment);
// current total rows and error rows will be reset after job pause, so here they should be 0.
RoutineLoadStatistic jobStatistic = Deencapsulation.getField(routineLoadJob, "jobStatistic");
@@ -429,8 +431,8 @@ public void testFinishTransaction() throws UserException {
List transTablets = generateTabletCommitInfos(CatalogTestUtil.testTabletId1, allBackends);
OlapTable testTable1 = (OlapTable) (masterEnv.getInternalCatalog()
.getDbOrMetaException(CatalogTestUtil.testDbId1).getTableOrMetaException(CatalogTestUtil.testTableId1));
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId,
- transTablets, null);
+ masterTransMgr.commitTransactionWithoutLock(
+ CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId, transTablets, null);
TransactionState transactionState = fakeEditLog.getTransaction(transactionId);
Assert.assertEquals(TransactionStatus.COMMITTED, transactionState.getTransactionStatus());
checkTableVersion(testTable1, 1, 2);
@@ -497,8 +499,8 @@ public void testFinishTransactionWithOneFailed() throws UserException {
LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second);
List transTablets = generateTabletCommitInfos(CatalogTestUtil.testTabletId1,
Lists.newArrayList(CatalogTestUtil.testBackendId1, CatalogTestUtil.testBackendId2));
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId,
- transTablets, null);
+ masterTransMgr.commitTransactionWithoutLock(
+ CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId, transTablets, null);
// follower catalog replay the transaction
TransactionState transactionState = fakeEditLog.getTransaction(transactionId);
@@ -562,8 +564,8 @@ public void testFinishTransactionWithOneFailed() throws UserException {
List transTablets = generateTabletCommitInfos(CatalogTestUtil.testTabletId1,
Lists.newArrayList(CatalogTestUtil.testBackendId1, CatalogTestUtil.testBackendId3));
try {
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1),
- transactionId2, transTablets, null);
+ masterTransMgr.commitTransactionWithoutLock(
+ CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2, transTablets, null);
Assert.fail();
} catch (TabletQuorumFailedException e) {
TransactionState transactionState = masterTransMgr.getTransactionState(CatalogTestUtil.testDbId1,
@@ -576,8 +578,8 @@ public void testFinishTransactionWithOneFailed() throws UserException {
// commit the second transaction with 1,2,3 success
if (true) {
List transTablets = generateTabletCommitInfos(CatalogTestUtil.testTabletId1, allBackends);
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2,
- transTablets, null);
+ masterTransMgr.commitTransactionWithoutLock(
+ CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2, transTablets, null);
TransactionState transactionState = fakeEditLog.getTransaction(transactionId2);
// check status is commit
Assert.assertEquals(TransactionStatus.COMMITTED, transactionState.getTransactionStatus());
@@ -654,8 +656,8 @@ public void testCommitTransactionWithSubTxn() throws UserException {
subTransactionInfos);
transactionState.setSubTxnIds(subTransactionStates.stream().map(SubTransactionState::getSubTransactionId)
.collect(Collectors.toList()));
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(table1, table2), transactionId,
- subTransactionStates, 300000);
+ masterTransMgr.commitTransactionWithoutLock(
+ CatalogTestUtil.testDbId1, Lists.newArrayList(table1, table2), transactionId, subTransactionStates, 300000);
// check status is committed
Assert.assertEquals(TransactionStatus.COMMITTED, transactionState.getTransactionStatus());
// check partition version
@@ -724,8 +726,8 @@ public void testCommitTransactionWithSubTxnAndOneFailed() throws UserException {
// commit txn
transactionState.setSubTxnIds(subTransactionStates.stream().map(SubTransactionState::getSubTransactionId)
.collect(Collectors.toList()));
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(table1, table2),
- transactionId,
+ masterTransMgr.commitTransactionWithoutLock(
+ CatalogTestUtil.testDbId1, Lists.newArrayList(table1, table2), transactionId,
subTransactionStates, 300000);
// check status is committed
Assert.assertEquals(TransactionStatus.COMMITTED, transactionState.getTransactionStatus());
@@ -767,7 +769,8 @@ public void testCommitTransactionWithSubTxnAndOneFailed() throws UserException {
try {
transactionState.setSubTxnIds(subTransactionStates.stream().map(SubTransactionState::getSubTransactionId)
.collect(Collectors.toList()));
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(table1, table2),
+ masterTransMgr.commitTransactionWithoutLock(
+ CatalogTestUtil.testDbId1, Lists.newArrayList(table1, table2),
transactionId, subTransactionStates, 300000);
Assert.fail();
} catch (TabletQuorumFailedException e) {
@@ -803,7 +806,8 @@ public void testCommitTransactionWithSubTxnAndOneFailed() throws UserException {
// commit txn
transactionState.setSubTxnIds(subTransactionStates.stream().map(SubTransactionState::getSubTransactionId)
.collect(Collectors.toList()));
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(table1, table2),
+ masterTransMgr.commitTransactionWithoutLock(
+ CatalogTestUtil.testDbId1, Lists.newArrayList(table1, table2),
transactionId, subTransactionStates, 300000);
Assert.assertEquals(TransactionStatus.COMMITTED, transactionState.getTransactionStatus());
// check partition version
@@ -881,7 +885,8 @@ public void testCommitTransactionWithSubTxnAndReplicaFailed() throws UserExcepti
subTransactionInfos);
// commit txn
try {
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(table1),
+ masterTransMgr.commitTransactionWithoutLock(
+ CatalogTestUtil.testDbId1, Lists.newArrayList(table1),
transactionId,
subTransactionStates, 300000);
} catch (TabletQuorumFailedException e) {
@@ -919,7 +924,8 @@ public void testFinishTransactionWithSubTransaction() throws UserException {
// commit txn
transactionState.setSubTxnIds(subTransactionStates.stream().map(SubTransactionState::getSubTransactionId)
.collect(Collectors.toList()));
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(table1, table2), transactionId,
+ masterTransMgr.commitTransactionWithoutLock(
+ CatalogTestUtil.testDbId1, Lists.newArrayList(table1, table2), transactionId,
subTransactionStates, 300000);
// check status is committed
Assert.assertEquals(TransactionStatus.COMMITTED, transactionState.getTransactionStatus());
@@ -1022,7 +1028,8 @@ public void testFinishTransactionWithSubTransactionAndOneFailed() throws UserExc
// commit txn
transactionState.setSubTxnIds(subTransactionStates.stream().map(SubTransactionState::getSubTransactionId)
.collect(Collectors.toList()));
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(table1, table2),
+ masterTransMgr.commitTransactionWithoutLock(
+ CatalogTestUtil.testDbId1, Lists.newArrayList(table1, table2),
transactionId, subTransactionStates, 300000);
// check status is committed
Assert.assertEquals(TransactionStatus.COMMITTED, transactionState.getTransactionStatus());
diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift
index 556668c8ef8186..6389e23bfa429c 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -867,6 +867,7 @@ struct TCommitTxnRequest {
// used for ccr
13: optional bool txn_insert
14: optional list sub_txn_infos
+ 15: optional bool only_commit // only commit txn, without waiting txn publish
}
struct TCommitTxnResult {