From 1b90512f4eeb9e98e63fe3d3b1a125c9b8792d52 Mon Sep 17 00:00:00 2001 From: walter Date: Thu, 13 Mar 2025 14:38:51 +0800 Subject: [PATCH 1/2] [improve](binlog) Allow commit txn without waiting txn publish (#48961) Related PR: https://github.com/selectdb/ccr-syncer/pull/502 Problem Summary: Commit and publish txn VS only commit txn: ![image](https://github.com/user-attachments/assets/9c87b365-3257-48e1-aa19-c69d0610e519) --- .../doris/load/loadv2/BrokerLoadJob.java | 2 +- .../doris/load/loadv2/SparkLoadJob.java | 2 +- .../doris/service/FrontendServiceImpl.java | 20 +++++++++--- .../transaction/GlobalTransactionMgr.java | 27 +++++++++++++--- .../apache/doris/load/DeleteHandlerTest.java | 3 +- .../doris/load/loadv2/SparkLoadJobTest.java | 3 +- .../DatabaseTransactionMgrTest.java | 3 +- .../transaction/GlobalTransactionMgrTest.java | 31 ++++++++++++------- gensrc/thrift/FrontendService.thrift | 2 ++ 9 files changed, 67 insertions(+), 26 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java index 3b8428a4351795..1b76ce42bc0ab5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java @@ -321,7 +321,7 @@ private void onLoadingTaskFinished(BrokerLoadingTaskAttachment attachment) { .add("txn_id", transactionId) .add("msg", "Load job try to commit txn") .build()); - Env.getCurrentGlobalTransactionMgr().commitTransaction( + Env.getCurrentGlobalTransactionMgr().commitTransactionWithoutLock( dbId, tableList, transactionId, commitInfos, new LoadJobFinalOperation(id, loadingStatus, progress, loadStartTimestamp, finishTimestamp, state, failMsg)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java index 9aac4b36557345..f7accb126e726d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java @@ -653,7 +653,7 @@ private void tryCommitJob() throws UserException { Lists.newArrayList(tableToLoadPartitions.keySet())); MetaLockUtils.writeLockTablesOrMetaException(tableList); try { - Env.getCurrentGlobalTransactionMgr().commitTransaction( + Env.getCurrentGlobalTransactionMgr().commitTransactionWithoutLock( dbId, tableList, transactionId, commitInfos, new LoadJobFinalOperation(id, loadingStatus, progress, loadStartTimestamp, finishTimestamp, state, failMsg)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 7c1b6a45b4cb70..d48418c7f25cbf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -1747,11 +1747,21 @@ private boolean commitTxnImpl(TCommitTxnRequest request) throws UserException { long timeoutMs = request.isSetThriftRpcTimeoutMs() ? request.getThriftRpcTimeoutMs() / 2 : 5000; // Step 5: commit and publish - return Env.getCurrentGlobalTransactionMgr() - .commitAndPublishTransaction(db, tableList, - request.getTxnId(), - TabletCommitInfo.fromThrift(request.getCommitInfos()), timeoutMs, - TxnCommitAttachment.fromThrift(request.getTxnCommitAttachment())); + if (!request.isOnlyCommit()) { + return Env.getCurrentGlobalTransactionMgr() + .commitAndPublishTransaction(db, tableList, + request.getTxnId(), + TabletCommitInfo.fromThrift(request.getCommitInfos()), timeoutMs, + TxnCommitAttachment.fromThrift(request.getTxnCommitAttachment())); + } else { + // single table commit, so don't need to wait for publish. + Env.getCurrentGlobalTransactionMgr() + .commitTransaction(db, tableList, + request.getTxnId(), + TabletCommitInfo.fromThrift(request.getCommitInfos()), timeoutMs, + TxnCommitAttachment.fromThrift(request.getTxnCommitAttachment())); + return true; + } } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java index 15d6daf30d6917..7a16a8dccc3fb6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java @@ -226,10 +226,11 @@ private void preCommitTransaction2PC(long dbId, List tableList, long tran } @Deprecated - public void commitTransaction(long dbId, List
tableList, + @Override + public void commitTransactionWithoutLock(long dbId, List
tableList, long transactionId, List tabletCommitInfos) throws UserException { - commitTransaction(dbId, tableList, transactionId, tabletCommitInfos, null); + commitTransactionWithoutLock(dbId, tableList, transactionId, tabletCommitInfos, null); } /** @@ -241,7 +242,8 @@ public void commitTransaction(long dbId, List
tableList, * @note it is necessary to optimize the `lock` mechanism and `lock` scope resulting from wait lock long time * @note callers should get all tables' write locks before call this api */ - public void commitTransaction(long dbId, List
tableList, long transactionId, + @Override + public void commitTransactionWithoutLock(long dbId, List
tableList, long transactionId, List tabletCommitInfos, TxnCommitAttachment txnCommitAttachment) throws UserException { if (Config.disable_load_job) { @@ -255,6 +257,21 @@ public void commitTransaction(long dbId, List
tableList, long transaction dbTransactionMgr.commitTransaction(tableList, transactionId, tabletCommitInfos, txnCommitAttachment, false); } + @Override + public void commitTransaction(DatabaseIf db, List
tableList, long transactionId, + List tabletCommitInfos, long timeoutMillis, TxnCommitAttachment txnCommitAttachment) + throws UserException { + if (!MetaLockUtils.tryWriteLockTablesOrMetaException(tableList, timeoutMillis, TimeUnit.MILLISECONDS)) { + throw new UserException("get tableList write lock timeout, tableList=(" + + StringUtils.join(tableList, ",") + ")"); + } + try { + commitTransactionWithoutLock(db.getId(), tableList, transactionId, tabletCommitInfos, txnCommitAttachment); + } finally { + MetaLockUtils.writeUnlockTables(tableList); + } + } + private void commitTransaction2PC(long dbId, long transactionId) throws UserException { if (Config.disable_load_job) { @@ -282,7 +299,7 @@ public boolean commitAndPublishTransaction(DatabaseIf db, List
tableList, + StringUtils.join(tableList, ",") + ")"); } try { - commitTransaction(db.getId(), tableList, transactionId, tabletCommitInfos, txnCommitAttachment); + commitTransactionWithoutLock(db.getId(), tableList, transactionId, tabletCommitInfos, txnCommitAttachment); } finally { MetaLockUtils.writeUnlockTables(tableList); } @@ -313,7 +330,7 @@ public void commitTransaction2PC(Database db, List
tableList, long transa } stopWatch.stop(); LOG.info("stream load tasks are committed successfully. txns: {}. time cost: {} ms." - + " data will be visable later.", transactionId, stopWatch.getTime()); + + " data will be visible later.", transactionId, stopWatch.getTime()); } public void abortTransaction(long dbId, long transactionId, String reason) throws UserException { diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java index 18f29ac30d2110..bf1b82019eb7c8 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java @@ -358,7 +358,8 @@ public Collection getTabletDeleteInfo() { new Expectations(globalTransactionMgr) { { try { - globalTransactionMgr.commitTransaction(anyLong, (List
) any, anyLong, (List) any, (TxnCommitAttachment) any); + globalTransactionMgr.commitTransactionWithoutLock( + anyLong, (List
) any, anyLong, (List) any, (TxnCommitAttachment) any); } catch (UserException e) { // CHECKSTYLE IGNORE THIS LINE } diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java index e3916cfb18d791..f1e9942c7e311a 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java @@ -377,7 +377,8 @@ public void testUpdateEtlStatusFinishedAndCommitTransaction(@Mocked Env env, @In AgentTaskExecutor.submit((AgentBatchTask) any); Env.getCurrentGlobalTransactionMgr(); result = transactionMgr; - transactionMgr.commitTransaction(dbId, (List
) any, transactionId, (List) any, + transactionMgr.commitTransactionWithoutLock( + dbId, (List
) any, transactionId, (List) any, (LoadJobFinalOperation) any); } }; diff --git a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java index 5f182ef50b7aed..76ae778153d956 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java @@ -111,7 +111,8 @@ public Map addTransactionToTransactionMgr() throws UserException { transTablets.add(tabletCommitInfo3); Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1) .getTableOrMetaException(CatalogTestUtil.testTableId1); - masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId1, + masterTransMgr.commitTransactionWithoutLock( + CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId1, transTablets, null); TransactionState transactionState1 = fakeEditLog.getTransaction(transactionId1); setTransactionFinishPublish(transactionState1, diff --git a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java index 7910df48be2eb9..2779a5d5107792 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java @@ -172,8 +172,8 @@ public void testCommitTransaction1() throws UserException { transTablets.add(tabletCommitInfo3); Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1) .getTableOrMetaException(CatalogTestUtil.testTableId1); - masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId, - transTablets, null); + masterTransMgr.commitTransactionWithoutLock( + CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId, transTablets, null); TransactionState transactionState = fakeEditLog.getTransaction(transactionId); // check status is committed Assert.assertEquals(TransactionStatus.COMMITTED, transactionState.getTransactionStatus()); @@ -213,7 +213,8 @@ public void testCommitTransactionWithOneFailed() throws UserException { transTablets.add(tabletCommitInfo2); Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1) .getTableOrMetaException(CatalogTestUtil.testTableId1); - masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId, + masterTransMgr.commitTransactionWithoutLock( + CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId, transTablets, null); // follower catalog replay the transaction @@ -235,7 +236,8 @@ public void testCommitTransactionWithOneFailed() throws UserException { transTablets.add(tabletCommitInfo1); transTablets.add(tabletCommitInfo3); try { - masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2, + masterTransMgr.commitTransactionWithoutLock( + CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2, transTablets, null); Assert.fail(); } catch (TabletQuorumFailedException e) { @@ -265,7 +267,8 @@ public void testCommitTransactionWithOneFailed() throws UserException { transTablets.add(tabletCommitInfo1); transTablets.add(tabletCommitInfo2); transTablets.add(tabletCommitInfo3); - masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2, + masterTransMgr.commitTransactionWithoutLock( + CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2, transTablets, null); transactionState = fakeEditLog.getTransaction(transactionId2); // check status is commit @@ -366,7 +369,8 @@ public void testCommitRoutineLoadTransaction(@Injectable TabletCommitInfo tablet Deencapsulation.setField(masterTransMgr.getDatabaseTransactionMgr(CatalogTestUtil.testDbId1), "idToRunningTransactionState", idToTransactionState); Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1) .getTableOrMetaException(CatalogTestUtil.testTableId1); - masterTransMgr.commitTransaction(1L, Lists.newArrayList(testTable1), 1L, transTablets, txnCommitAttachment); + masterTransMgr.commitTransactionWithoutLock( + 1L, Lists.newArrayList(testTable1), 1L, transTablets, txnCommitAttachment); RoutineLoadStatistic jobStatistic = Deencapsulation.getField(routineLoadJob, "jobStatistic"); Assert.assertEquals(Long.valueOf(101), Deencapsulation.getField(jobStatistic, "currentTotalRows")); @@ -439,7 +443,8 @@ public void testCommitRoutineLoadTransactionWithErrorMax(@Injectable TabletCommi Deencapsulation.setField(masterTransMgr.getDatabaseTransactionMgr(CatalogTestUtil.testDbId1), "idToRunningTransactionState", idToTransactionState); Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1) .getTableOrMetaException(CatalogTestUtil.testTableId1); - masterTransMgr.commitTransaction(1L, Lists.newArrayList(testTable1), 1L, transTablets, txnCommitAttachment); + masterTransMgr.commitTransactionWithoutLock( + 1L, Lists.newArrayList(testTable1), 1L, transTablets, txnCommitAttachment); // current total rows and error rows will be reset after job pause, so here they should be 0. RoutineLoadStatistic jobStatistic = Deencapsulation.getField(routineLoadJob, "jobStatistic"); @@ -471,7 +476,8 @@ public void testFinishTransaction() throws UserException { transTablets.add(tabletCommitInfo3); Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1) .getTableOrMetaException(CatalogTestUtil.testTableId1); - masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId, + masterTransMgr.commitTransactionWithoutLock( + CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId, transTablets, null); TransactionState transactionState = fakeEditLog.getTransaction(transactionId); Assert.assertEquals(TransactionStatus.COMMITTED, transactionState.getTransactionStatus()); @@ -537,7 +543,8 @@ public void testFinishTransactionWithOneFailed() throws UserException { transTablets.add(tabletCommitInfo2); Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1) .getTableOrMetaException(CatalogTestUtil.testTableId1); - masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId, + masterTransMgr.commitTransactionWithoutLock( + CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId, transTablets, null); // follower catalog replay the transaction @@ -607,7 +614,8 @@ public void testFinishTransactionWithOneFailed() throws UserException { transTablets.add(tabletCommitInfo1); transTablets.add(tabletCommitInfo3); try { - masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2, + masterTransMgr.commitTransactionWithoutLock( + CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2, transTablets, null); Assert.fail(); } catch (TabletQuorumFailedException e) { @@ -624,7 +632,8 @@ public void testFinishTransactionWithOneFailed() throws UserException { transTablets.add(tabletCommitInfo1); transTablets.add(tabletCommitInfo2); transTablets.add(tabletCommitInfo3); - masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2, + masterTransMgr.commitTransactionWithoutLock( + CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2, transTablets, null); transactionState = fakeEditLog.getTransaction(transactionId2); // check status is commit diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 3018924064b8a4..210f1ada0a9af3 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -824,6 +824,8 @@ struct TCommitTxnRequest { 10: optional i64 thrift_rpc_timeout_ms 11: optional string token 12: optional i64 db_id + // used for ccr + 15: optional bool only_commit // only commit txn, without waiting txn publish } struct TCommitTxnResult { From a95282e2315a895d22e26072b560b7b7c38ac89e Mon Sep 17 00:00:00 2001 From: w41ter Date: Thu, 20 Mar 2025 06:47:55 +0000 Subject: [PATCH 2/2] fixup --- .../org/apache/doris/transaction/GlobalTransactionMgr.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java index 7a16a8dccc3fb6..3bee819a1c861c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java @@ -226,7 +226,6 @@ private void preCommitTransaction2PC(long dbId, List
tableList, long tran } @Deprecated - @Override public void commitTransactionWithoutLock(long dbId, List
tableList, long transactionId, List tabletCommitInfos) throws UserException { @@ -242,7 +241,6 @@ public void commitTransactionWithoutLock(long dbId, List
tableList, * @note it is necessary to optimize the `lock` mechanism and `lock` scope resulting from wait lock long time * @note callers should get all tables' write locks before call this api */ - @Override public void commitTransactionWithoutLock(long dbId, List
tableList, long transactionId, List tabletCommitInfos, TxnCommitAttachment txnCommitAttachment) throws UserException { @@ -257,7 +255,6 @@ public void commitTransactionWithoutLock(long dbId, List
tableList, long dbTransactionMgr.commitTransaction(tableList, transactionId, tabletCommitInfos, txnCommitAttachment, false); } - @Override public void commitTransaction(DatabaseIf db, List
tableList, long transactionId, List tabletCommitInfos, long timeoutMillis, TxnCommitAttachment txnCommitAttachment) throws UserException {