From bbe4f665891c8bb0ae5555773caf923f82baa09a Mon Sep 17 00:00:00 2001 From: walter Date: Thu, 13 Mar 2025 14:38:51 +0800 Subject: [PATCH] [improve](binlog) Allow commit txn without waiting txn publish (#48961) Related PR: https://github.com/selectdb/ccr-syncer/pull/502 Problem Summary: Commit and publish txn VS only commit txn: ![image](https://github.com/user-attachments/assets/9c87b365-3257-48e1-aa19-c69d0610e519) --- .../CloudGlobalTransactionMgr.java | 17 ++++-- .../doris/load/loadv2/BrokerLoadJob.java | 2 +- .../doris/load/loadv2/SparkLoadJob.java | 2 +- .../doris/service/FrontendServiceImpl.java | 15 ++++- .../transaction/GlobalTransactionMgr.java | 31 +++++++--- .../GlobalTransactionMgrIface.java | 9 ++- .../CloudGlobalTransactionMgrTest.java | 10 +-- .../apache/doris/load/DeleteHandlerTest.java | 3 +- .../doris/load/loadv2/SparkLoadJobTest.java | 3 +- .../DatabaseTransactionMgrTest.java | 9 ++- .../transaction/GlobalTransactionMgrTest.java | 61 +++++++++++-------- gensrc/thrift/FrontendService.thrift | 1 + 12 files changed, 108 insertions(+), 55 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java index e00a2211419896..b526c5886e023e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java @@ -338,14 +338,14 @@ public void preCommitTransaction2PC(Database db, List tableList, long tra @Deprecated @Override - public void commitTransaction(long dbId, List
tableList, + public void commitTransactionWithoutLock(long dbId, List
tableList, long transactionId, List tabletCommitInfos) throws UserException { - commitTransaction(dbId, tableList, transactionId, tabletCommitInfos, null); + commitTransactionWithoutLock(dbId, tableList, transactionId, tabletCommitInfos, null); } @Override - public void commitTransaction(long dbId, List
tableList, long transactionId, + public void commitTransactionWithoutLock(long dbId, List
tableList, long transactionId, List tabletCommitInfos, TxnCommitAttachment txnCommitAttachment) throws UserException { List mowTableList = getMowTableList(tableList, tabletCommitInfos); @@ -1186,6 +1186,15 @@ private List
getTablesNeedCommitLock(List
tableList) { } } + @Override + public void commitTransaction(DatabaseIf db, List
tableList, long transactionId, + List tabletCommitInfos, long timeoutMillis, TxnCommitAttachment txnCommitAttachment) + throws UserException { + // There is no publish in cloud mode + commitAndPublishTransaction( + db, tableList, transactionId, tabletCommitInfos, timeoutMillis, txnCommitAttachment); + } + @Override public boolean commitAndPublishTransaction(DatabaseIf db, List
tableList, long transactionId, List tabletCommitInfos, long timeoutMillis, @@ -1234,7 +1243,7 @@ public boolean commitAndPublishTransaction(DatabaseIf db, List
tableList, } try { - commitTransaction(db.getId(), tableList, transactionId, tabletCommitInfos, txnCommitAttachment); + commitTransactionWithoutLock(db.getId(), tableList, transactionId, tabletCommitInfos, txnCommitAttachment); } finally { decreaseWaitingLockCount(tablesToLock); MetaLockUtils.commitUnlockTables(tablesToLock); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java index 536a7267fdd90b..05d9d0c2accdeb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java @@ -361,7 +361,7 @@ private void onLoadingTaskFinished(BrokerLoadingTaskAttachment attachment) { .add("txn_id", transactionId) .add("msg", "Load job try to commit txn") .build()); - Env.getCurrentGlobalTransactionMgr().commitTransaction( + Env.getCurrentGlobalTransactionMgr().commitTransactionWithoutLock( dbId, tableList, transactionId, commitInfos, getLoadJobFinalOperation()); afterLoadingTaskCommitTransaction(tableList); afterCommit(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java index f01f205e96dc0d..1abc8b76b544ea 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java @@ -671,7 +671,7 @@ private void tryCommitJob() throws UserException { try { LOG.info(new LogBuilder(LogKey.LOAD_JOB, id).add("txn_id", transactionId) .add("msg", "Load job try to commit txn").build()); - Env.getCurrentGlobalTransactionMgr().commitTransaction( + Env.getCurrentGlobalTransactionMgr().commitTransactionWithoutLock( dbId, tableList, transactionId, commitInfos, new LoadJobFinalOperation(id, loadingStatus, progress, loadStartTimestamp, finishTimestamp, state, failMsg)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 38d81d64bfa650..381ea3bbbf77c7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -1854,14 +1854,23 @@ private boolean commitTxnImpl(TCommitTxnRequest request) throws UserException { subTxnInfo.getTabletCommitInfos(), null)); } transactionState.setSubTxnIds(subTxnIds); - return Env.getCurrentGlobalTransactionMgr().commitAndPublishTransaction(db, request.getTxnId(), - subTransactionStates, timeoutMs); - } else { + return Env.getCurrentGlobalTransactionMgr() + .commitAndPublishTransaction(db, request.getTxnId(), + subTransactionStates, timeoutMs); + } else if (!request.isOnlyCommit()) { return Env.getCurrentGlobalTransactionMgr() .commitAndPublishTransaction(db, tableList, request.getTxnId(), TabletCommitInfo.fromThrift(request.getCommitInfos()), timeoutMs, TxnCommitAttachment.fromThrift(request.getTxnCommitAttachment())); + } else { + // single table commit, so don't need to wait for publish. + Env.getCurrentGlobalTransactionMgr() + .commitTransaction(db, tableList, + request.getTxnId(), + TabletCommitInfo.fromThrift(request.getCommitInfos()), timeoutMs, + TxnCommitAttachment.fromThrift(request.getTxnCommitAttachment())); + return true; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java index ff1115bf0a6990..6552c65d412464 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java @@ -215,10 +215,11 @@ private void preCommitTransaction2PC(long dbId, List
tableList, long tran } @Deprecated - public void commitTransaction(long dbId, List
tableList, + @Override + public void commitTransactionWithoutLock(long dbId, List
tableList, long transactionId, List tabletCommitInfos) throws UserException { - commitTransaction(dbId, tableList, transactionId, tabletCommitInfos, null); + commitTransactionWithoutLock(dbId, tableList, transactionId, tabletCommitInfos, null); } @Override @@ -234,7 +235,8 @@ public void afterCommitTxnResp(CommitTxnResponse commitTxnResponse) { * @note it is necessary to optimize the `lock` mechanism and `lock` scope resulting from wait lock long time * @note callers should get all tables' write locks before call this api */ - public void commitTransaction(long dbId, List
tableList, long transactionId, + @Override + public void commitTransactionWithoutLock(long dbId, List
tableList, long transactionId, List tabletCommitInfos, TxnCommitAttachment txnCommitAttachment) throws UserException { if (Config.disable_load_job) { @@ -251,7 +253,7 @@ public void commitTransaction(long dbId, List
tableList, long transaction /** * @note callers should get all tables' write locks before call this api */ - public void commitTransaction(long dbId, List
tableList, long transactionId, + public void commitTransactionWithoutLock(long dbId, List
tableList, long transactionId, List subTransactionStates, long timeout) throws UserException { if (Config.disable_load_job) { throw new TransactionCommitFailedException("disable_load_job is set to true, all load jobs are prevented"); @@ -264,6 +266,21 @@ public void commitTransaction(long dbId, List
tableList, long transaction dbTransactionMgr.commitTransaction(transactionId, tableList, subTransactionStates); } + @Override + public void commitTransaction(DatabaseIf db, List
tableList, long transactionId, + List tabletCommitInfos, long timeoutMillis, TxnCommitAttachment txnCommitAttachment) + throws UserException { + if (!MetaLockUtils.tryWriteLockTablesOrMetaException(tableList, timeoutMillis, TimeUnit.MILLISECONDS)) { + throw new UserException("get tableList write lock timeout, tableList=(" + + StringUtils.join(tableList, ",") + ")"); + } + try { + commitTransactionWithoutLock(db.getId(), tableList, transactionId, tabletCommitInfos, txnCommitAttachment); + } finally { + MetaLockUtils.writeUnlockTables(tableList); + } + } + private void commitTransaction2PC(long dbId, long transactionId) throws UserException { if (Config.disable_load_job) { @@ -291,7 +308,7 @@ public boolean commitAndPublishTransaction(DatabaseIf db, List
tableList, + StringUtils.join(tableList, ",") + ")"); } try { - commitTransaction(db.getId(), tableList, transactionId, tabletCommitInfos, txnCommitAttachment); + commitTransactionWithoutLock(db.getId(), tableList, transactionId, tabletCommitInfos, txnCommitAttachment); } finally { MetaLockUtils.writeUnlockTables(tableList); } @@ -320,7 +337,7 @@ public boolean commitAndPublishTransaction(DatabaseIf db, long transactionId, + StringUtils.join(tableList, ",") + ")"); } try { - commitTransaction(db.getId(), tableList, transactionId, subTransactionStates, timeoutMillis); + commitTransactionWithoutLock(db.getId(), tableList, transactionId, subTransactionStates, timeoutMillis); } finally { MetaLockUtils.writeUnlockTables(tableList); } @@ -351,7 +368,7 @@ public void commitTransaction2PC(Database db, List
tableList, long transa } stopWatch.stop(); LOG.info("stream load tasks are committed successfully. txns: {}. time cost: {} ms." - + " data will be visable later.", transactionId, stopWatch.getTime()); + + " data will be visible later.", transactionId, stopWatch.getTime()); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgrIface.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgrIface.java index c8d436e68d8356..38c8cdd5a6d07e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgrIface.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgrIface.java @@ -78,14 +78,19 @@ public void preCommitTransaction2PC(Database db, List
tableList, long tra throws UserException; @Deprecated - public void commitTransaction(long dbId, List
tableList, + public void commitTransactionWithoutLock(long dbId, List
tableList, long transactionId, List tabletCommitInfos) throws UserException; - public void commitTransaction(long dbId, List
tableList, long transactionId, + public void commitTransactionWithoutLock(long dbId, List
tableList, long transactionId, List tabletCommitInfos, TxnCommitAttachment txnCommitAttachment) throws UserException; + public void commitTransaction(DatabaseIf db, List
tableList, long transactionId, + List tabletCommitInfos, long timeoutMillis, + TxnCommitAttachment txnCommitAttachment) + throws UserException; + public boolean commitAndPublishTransaction(DatabaseIf db, List
tableList, long transactionId, List tabletCommitInfos, long timeoutMillis) throws UserException; diff --git a/fe/fe-core/src/test/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgrTest.java index 2522e2487ac349..75648634e3beac 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgrTest.java @@ -194,7 +194,7 @@ public Cloud.CommitTxnResponse commitTxn(Cloud.CommitTxnRequest request) { long transactionId = 123533; Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1) .getTableOrMetaException(CatalogTestUtil.testTableId1); - masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), + masterTransMgr.commitTransactionWithoutLock(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId, null, null); } @@ -219,7 +219,7 @@ public Cloud.CommitTxnResponse commitTxn(Cloud.CommitTxnRequest request) { long transactionId = 123533; Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1) .getTableOrMetaException(CatalogTestUtil.testTableId1); - masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), + masterTransMgr.commitTransactionWithoutLock(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId, null, null); } @@ -246,8 +246,8 @@ public Cloud.CommitTxnResponse commitTxn(Cloud.CommitTxnRequest request) { long transactionId = 123533; Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1) .getTableOrMetaException(CatalogTestUtil.testTableId1); - masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), - transactionId, null, null); + masterTransMgr.commitTransactionWithoutLock( + CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId, null, null); }); } @@ -278,7 +278,7 @@ public Cloud.CommitTxnResponse commitTxn(Cloud.CommitTxnRequest request) { long transactionId = 123533; Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1) .getTableOrMetaException(CatalogTestUtil.testTableId1); - masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), + masterTransMgr.commitTransactionWithoutLock(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId, null, null); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java index 18f29ac30d2110..bf1b82019eb7c8 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java @@ -358,7 +358,8 @@ public Collection getTabletDeleteInfo() { new Expectations(globalTransactionMgr) { { try { - globalTransactionMgr.commitTransaction(anyLong, (List
) any, anyLong, (List) any, (TxnCommitAttachment) any); + globalTransactionMgr.commitTransactionWithoutLock( + anyLong, (List
) any, anyLong, (List) any, (TxnCommitAttachment) any); } catch (UserException e) { // CHECKSTYLE IGNORE THIS LINE } diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java index e3916cfb18d791..f1e9942c7e311a 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java @@ -377,7 +377,8 @@ public void testUpdateEtlStatusFinishedAndCommitTransaction(@Mocked Env env, @In AgentTaskExecutor.submit((AgentBatchTask) any); Env.getCurrentGlobalTransactionMgr(); result = transactionMgr; - transactionMgr.commitTransaction(dbId, (List
) any, transactionId, (List) any, + transactionMgr.commitTransactionWithoutLock( + dbId, (List
) any, transactionId, (List) any, (LoadJobFinalOperation) any); } }; diff --git a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java index 82c79e9f6f00de..365cf8051d82e9 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java @@ -144,7 +144,8 @@ public Map addTransactionToTransactionMgr() throws UserException { CatalogTestUtil.testTabletId1, allBackends); Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1) .getTableOrMetaException(CatalogTestUtil.testTableId1); - masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId1, + masterTransMgr.commitTransactionWithoutLock( + CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId1, transTablets, null); TransactionState transactionState1 = fakeEditLog.getTransaction(transactionId1); Map> keyToSuccessTablets = new HashMap<>(); @@ -541,7 +542,8 @@ private void addSubTransaction() throws UserException { setSuccessTablet(keyToSuccessTablets, allBackends, transactionId, CatalogTestUtil.testTabletId1, 14); setSuccessTablet(keyToSuccessTablets, allBackends, subTxnId2, CatalogTestUtil.testTabletId2, 13); setSuccessTablet(keyToSuccessTablets, allBackends, subTxnId4, CatalogTestUtil.testTabletId1, 15); - masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(table1, table2, table1), + masterTransMgr.commitTransactionWithoutLock( + CatalogTestUtil.testDbId1, Lists.newArrayList(table1, table2, table1), transactionState6.getTransactionId(), GlobalTransactionMgrTest.generateSubTransactionStates(masterTransMgr, transactionState6, subTransactionInfos), 300000); @@ -577,7 +579,8 @@ private void addSubTransaction() throws UserException { new SubTransactionInfo(table1, CatalogTestUtil.testTabletId1, allBackends, subTxnIds8.get(0)), new SubTransactionInfo(table2, CatalogTestUtil.testTabletId2, allBackends, subTxnIds8.get(1)), new SubTransactionInfo(table1, CatalogTestUtil.testTabletId1, allBackends, subTxnIds8.get(2))); - masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(table1, table2), + masterTransMgr.commitTransactionWithoutLock( + CatalogTestUtil.testDbId1, Lists.newArrayList(table1, table2), transactionState8.getTransactionId(), GlobalTransactionMgrTest.generateSubTransactionStates(masterTransMgr, transactionState8, subTransactionInfos), 300000); diff --git a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java index c4ec468c651856..522021a97710d5 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java @@ -177,8 +177,8 @@ public void testCommitTransaction() throws UserException { List transTablets = generateTabletCommitInfos(CatalogTestUtil.testTabletId1, allBackends); Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1) .getTableOrMetaException(CatalogTestUtil.testTableId1); - masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId, - transTablets, null); + masterTransMgr.commitTransactionWithoutLock( + CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId, transTablets, null); TransactionState transactionState = fakeEditLog.getTransaction(transactionId); // check status is committed Assert.assertEquals(TransactionStatus.COMMITTED, transactionState.getTransactionStatus()); @@ -209,8 +209,8 @@ public void testCommitTransactionWithOneFailed() throws UserException { List transTablets = generateTabletCommitInfos(CatalogTestUtil.testTabletId1, Lists.newArrayList(CatalogTestUtil.testBackendId1, CatalogTestUtil.testBackendId2)); // commit txn - masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId, - transTablets, null); + masterTransMgr.commitTransactionWithoutLock( + CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId, transTablets, null); checkVersion(testTable1, CatalogTestUtil.testPartition1, CatalogTestUtil.testIndexId1, CatalogTestUtil.testTabletId1, CatalogTestUtil.testStartVersion, CatalogTestUtil.testStartVersion + 2, @@ -240,8 +240,8 @@ public void testCommitTransactionWithOneFailed() throws UserException { List transTablets = generateTabletCommitInfos(CatalogTestUtil.testTabletId1, Lists.newArrayList(CatalogTestUtil.testBackendId1, CatalogTestUtil.testBackendId3)); try { - masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), - transactionId2, transTablets, null); + masterTransMgr.commitTransactionWithoutLock( + CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2, transTablets, null); Assert.fail(); } catch (TabletQuorumFailedException e) { TransactionState transactionState = masterTransMgr.getTransactionState(CatalogTestUtil.testDbId1, transactionId2); @@ -260,8 +260,8 @@ public void testCommitTransactionWithOneFailed() throws UserException { // txn3: commit the second transaction with 1,2,3 success if (true) { List transTablets = generateTabletCommitInfos(CatalogTestUtil.testTabletId1, allBackends); - masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2, - transTablets, null); + masterTransMgr.commitTransactionWithoutLock( + CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2, transTablets, null); TransactionState transactionState = fakeEditLog.getTransaction(transactionId2); // check status is committed Assert.assertEquals(TransactionStatus.COMMITTED, transactionState.getTransactionStatus()); @@ -342,7 +342,8 @@ public void testCommitRoutineLoadTransaction(@Injectable TabletCommitInfo tablet Deencapsulation.setField(masterTransMgr.getDatabaseTransactionMgr(CatalogTestUtil.testDbId1), "idToRunningTransactionState", idToTransactionState); Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1) .getTableOrMetaException(CatalogTestUtil.testTableId1); - masterTransMgr.commitTransaction(1L, Lists.newArrayList(testTable1), 1L, transTablets, txnCommitAttachment); + masterTransMgr.commitTransactionWithoutLock( + 1L, Lists.newArrayList(testTable1), 1L, transTablets, txnCommitAttachment); RoutineLoadStatistic jobStatistic = Deencapsulation.getField(routineLoadJob, "jobStatistic"); Assert.assertEquals(Long.valueOf(101), Deencapsulation.getField(jobStatistic, "currentTotalRows")); @@ -407,7 +408,8 @@ public void testCommitRoutineLoadTransactionWithErrorMax(@Injectable TabletCommi Deencapsulation.setField(masterTransMgr.getDatabaseTransactionMgr(CatalogTestUtil.testDbId1), "idToRunningTransactionState", idToTransactionState); Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1) .getTableOrMetaException(CatalogTestUtil.testTableId1); - masterTransMgr.commitTransaction(1L, Lists.newArrayList(testTable1), 1L, transTablets, txnCommitAttachment); + masterTransMgr.commitTransactionWithoutLock( + 1L, Lists.newArrayList(testTable1), 1L, transTablets, txnCommitAttachment); // current total rows and error rows will be reset after job pause, so here they should be 0. RoutineLoadStatistic jobStatistic = Deencapsulation.getField(routineLoadJob, "jobStatistic"); @@ -429,8 +431,8 @@ public void testFinishTransaction() throws UserException { List transTablets = generateTabletCommitInfos(CatalogTestUtil.testTabletId1, allBackends); OlapTable testTable1 = (OlapTable) (masterEnv.getInternalCatalog() .getDbOrMetaException(CatalogTestUtil.testDbId1).getTableOrMetaException(CatalogTestUtil.testTableId1)); - masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId, - transTablets, null); + masterTransMgr.commitTransactionWithoutLock( + CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId, transTablets, null); TransactionState transactionState = fakeEditLog.getTransaction(transactionId); Assert.assertEquals(TransactionStatus.COMMITTED, transactionState.getTransactionStatus()); checkTableVersion(testTable1, 1, 2); @@ -497,8 +499,8 @@ public void testFinishTransactionWithOneFailed() throws UserException { LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second); List transTablets = generateTabletCommitInfos(CatalogTestUtil.testTabletId1, Lists.newArrayList(CatalogTestUtil.testBackendId1, CatalogTestUtil.testBackendId2)); - masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId, - transTablets, null); + masterTransMgr.commitTransactionWithoutLock( + CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId, transTablets, null); // follower catalog replay the transaction TransactionState transactionState = fakeEditLog.getTransaction(transactionId); @@ -562,8 +564,8 @@ public void testFinishTransactionWithOneFailed() throws UserException { List transTablets = generateTabletCommitInfos(CatalogTestUtil.testTabletId1, Lists.newArrayList(CatalogTestUtil.testBackendId1, CatalogTestUtil.testBackendId3)); try { - masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), - transactionId2, transTablets, null); + masterTransMgr.commitTransactionWithoutLock( + CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2, transTablets, null); Assert.fail(); } catch (TabletQuorumFailedException e) { TransactionState transactionState = masterTransMgr.getTransactionState(CatalogTestUtil.testDbId1, @@ -576,8 +578,8 @@ public void testFinishTransactionWithOneFailed() throws UserException { // commit the second transaction with 1,2,3 success if (true) { List transTablets = generateTabletCommitInfos(CatalogTestUtil.testTabletId1, allBackends); - masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2, - transTablets, null); + masterTransMgr.commitTransactionWithoutLock( + CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2, transTablets, null); TransactionState transactionState = fakeEditLog.getTransaction(transactionId2); // check status is commit Assert.assertEquals(TransactionStatus.COMMITTED, transactionState.getTransactionStatus()); @@ -654,8 +656,8 @@ public void testCommitTransactionWithSubTxn() throws UserException { subTransactionInfos); transactionState.setSubTxnIds(subTransactionStates.stream().map(SubTransactionState::getSubTransactionId) .collect(Collectors.toList())); - masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(table1, table2), transactionId, - subTransactionStates, 300000); + masterTransMgr.commitTransactionWithoutLock( + CatalogTestUtil.testDbId1, Lists.newArrayList(table1, table2), transactionId, subTransactionStates, 300000); // check status is committed Assert.assertEquals(TransactionStatus.COMMITTED, transactionState.getTransactionStatus()); // check partition version @@ -724,8 +726,8 @@ public void testCommitTransactionWithSubTxnAndOneFailed() throws UserException { // commit txn transactionState.setSubTxnIds(subTransactionStates.stream().map(SubTransactionState::getSubTransactionId) .collect(Collectors.toList())); - masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(table1, table2), - transactionId, + masterTransMgr.commitTransactionWithoutLock( + CatalogTestUtil.testDbId1, Lists.newArrayList(table1, table2), transactionId, subTransactionStates, 300000); // check status is committed Assert.assertEquals(TransactionStatus.COMMITTED, transactionState.getTransactionStatus()); @@ -767,7 +769,8 @@ public void testCommitTransactionWithSubTxnAndOneFailed() throws UserException { try { transactionState.setSubTxnIds(subTransactionStates.stream().map(SubTransactionState::getSubTransactionId) .collect(Collectors.toList())); - masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(table1, table2), + masterTransMgr.commitTransactionWithoutLock( + CatalogTestUtil.testDbId1, Lists.newArrayList(table1, table2), transactionId, subTransactionStates, 300000); Assert.fail(); } catch (TabletQuorumFailedException e) { @@ -803,7 +806,8 @@ public void testCommitTransactionWithSubTxnAndOneFailed() throws UserException { // commit txn transactionState.setSubTxnIds(subTransactionStates.stream().map(SubTransactionState::getSubTransactionId) .collect(Collectors.toList())); - masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(table1, table2), + masterTransMgr.commitTransactionWithoutLock( + CatalogTestUtil.testDbId1, Lists.newArrayList(table1, table2), transactionId, subTransactionStates, 300000); Assert.assertEquals(TransactionStatus.COMMITTED, transactionState.getTransactionStatus()); // check partition version @@ -881,7 +885,8 @@ public void testCommitTransactionWithSubTxnAndReplicaFailed() throws UserExcepti subTransactionInfos); // commit txn try { - masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(table1), + masterTransMgr.commitTransactionWithoutLock( + CatalogTestUtil.testDbId1, Lists.newArrayList(table1), transactionId, subTransactionStates, 300000); } catch (TabletQuorumFailedException e) { @@ -919,7 +924,8 @@ public void testFinishTransactionWithSubTransaction() throws UserException { // commit txn transactionState.setSubTxnIds(subTransactionStates.stream().map(SubTransactionState::getSubTransactionId) .collect(Collectors.toList())); - masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(table1, table2), transactionId, + masterTransMgr.commitTransactionWithoutLock( + CatalogTestUtil.testDbId1, Lists.newArrayList(table1, table2), transactionId, subTransactionStates, 300000); // check status is committed Assert.assertEquals(TransactionStatus.COMMITTED, transactionState.getTransactionStatus()); @@ -1022,7 +1028,8 @@ public void testFinishTransactionWithSubTransactionAndOneFailed() throws UserExc // commit txn transactionState.setSubTxnIds(subTransactionStates.stream().map(SubTransactionState::getSubTransactionId) .collect(Collectors.toList())); - masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(table1, table2), + masterTransMgr.commitTransactionWithoutLock( + CatalogTestUtil.testDbId1, Lists.newArrayList(table1, table2), transactionId, subTransactionStates, 300000); // check status is committed Assert.assertEquals(TransactionStatus.COMMITTED, transactionState.getTransactionStatus()); diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 556668c8ef8186..6389e23bfa429c 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -867,6 +867,7 @@ struct TCommitTxnRequest { // used for ccr 13: optional bool txn_insert 14: optional list sub_txn_infos + 15: optional bool only_commit // only commit txn, without waiting txn publish } struct TCommitTxnResult {