diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
index 3b8428a4351795..1b76ce42bc0ab5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
@@ -321,7 +321,7 @@ private void onLoadingTaskFinished(BrokerLoadingTaskAttachment attachment) {
.add("txn_id", transactionId)
.add("msg", "Load job try to commit txn")
.build());
- Env.getCurrentGlobalTransactionMgr().commitTransaction(
+ Env.getCurrentGlobalTransactionMgr().commitTransactionWithoutLock(
dbId, tableList, transactionId, commitInfos,
new LoadJobFinalOperation(id, loadingStatus, progress, loadStartTimestamp,
finishTimestamp, state, failMsg));
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
index 9aac4b36557345..f7accb126e726d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
@@ -653,7 +653,7 @@ private void tryCommitJob() throws UserException {
Lists.newArrayList(tableToLoadPartitions.keySet()));
MetaLockUtils.writeLockTablesOrMetaException(tableList);
try {
- Env.getCurrentGlobalTransactionMgr().commitTransaction(
+ Env.getCurrentGlobalTransactionMgr().commitTransactionWithoutLock(
dbId, tableList, transactionId, commitInfos,
new LoadJobFinalOperation(id, loadingStatus, progress, loadStartTimestamp,
finishTimestamp, state, failMsg));
diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 7c1b6a45b4cb70..d48418c7f25cbf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -1747,11 +1747,21 @@ private boolean commitTxnImpl(TCommitTxnRequest request) throws UserException {
long timeoutMs = request.isSetThriftRpcTimeoutMs() ? request.getThriftRpcTimeoutMs() / 2 : 5000;
// Step 5: commit and publish
- return Env.getCurrentGlobalTransactionMgr()
- .commitAndPublishTransaction(db, tableList,
- request.getTxnId(),
- TabletCommitInfo.fromThrift(request.getCommitInfos()), timeoutMs,
- TxnCommitAttachment.fromThrift(request.getTxnCommitAttachment()));
+ if (!request.isOnlyCommit()) {
+ return Env.getCurrentGlobalTransactionMgr()
+ .commitAndPublishTransaction(db, tableList,
+ request.getTxnId(),
+ TabletCommitInfo.fromThrift(request.getCommitInfos()), timeoutMs,
+ TxnCommitAttachment.fromThrift(request.getTxnCommitAttachment()));
+ } else {
+ // single table commit, so don't need to wait for publish.
+ Env.getCurrentGlobalTransactionMgr()
+ .commitTransaction(db, tableList,
+ request.getTxnId(),
+ TabletCommitInfo.fromThrift(request.getCommitInfos()), timeoutMs,
+ TxnCommitAttachment.fromThrift(request.getTxnCommitAttachment()));
+ return true;
+ }
}
@Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
index 15d6daf30d6917..3bee819a1c861c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
@@ -226,10 +226,10 @@ private void preCommitTransaction2PC(long dbId, List
tableList, long tran
}
@Deprecated
- public void commitTransaction(long dbId, List tableList,
+ public void commitTransactionWithoutLock(long dbId, List tableList,
long transactionId, List tabletCommitInfos)
throws UserException {
- commitTransaction(dbId, tableList, transactionId, tabletCommitInfos, null);
+ commitTransactionWithoutLock(dbId, tableList, transactionId, tabletCommitInfos, null);
}
/**
@@ -241,7 +241,7 @@ public void commitTransaction(long dbId, List tableList,
* @note it is necessary to optimize the `lock` mechanism and `lock` scope resulting from wait lock long time
* @note callers should get all tables' write locks before call this api
*/
- public void commitTransaction(long dbId, List tableList, long transactionId,
+ public void commitTransactionWithoutLock(long dbId, List tableList, long transactionId,
List tabletCommitInfos, TxnCommitAttachment txnCommitAttachment)
throws UserException {
if (Config.disable_load_job) {
@@ -255,6 +255,20 @@ public void commitTransaction(long dbId, List tableList, long transaction
dbTransactionMgr.commitTransaction(tableList, transactionId, tabletCommitInfos, txnCommitAttachment, false);
}
+ public void commitTransaction(DatabaseIf db, List tableList, long transactionId,
+ List tabletCommitInfos, long timeoutMillis, TxnCommitAttachment txnCommitAttachment)
+ throws UserException {
+ if (!MetaLockUtils.tryWriteLockTablesOrMetaException(tableList, timeoutMillis, TimeUnit.MILLISECONDS)) {
+ throw new UserException("get tableList write lock timeout, tableList=("
+ + StringUtils.join(tableList, ",") + ")");
+ }
+ try {
+ commitTransactionWithoutLock(db.getId(), tableList, transactionId, tabletCommitInfos, txnCommitAttachment);
+ } finally {
+ MetaLockUtils.writeUnlockTables(tableList);
+ }
+ }
+
private void commitTransaction2PC(long dbId, long transactionId)
throws UserException {
if (Config.disable_load_job) {
@@ -282,7 +296,7 @@ public boolean commitAndPublishTransaction(DatabaseIf db, List tableList,
+ StringUtils.join(tableList, ",") + ")");
}
try {
- commitTransaction(db.getId(), tableList, transactionId, tabletCommitInfos, txnCommitAttachment);
+ commitTransactionWithoutLock(db.getId(), tableList, transactionId, tabletCommitInfos, txnCommitAttachment);
} finally {
MetaLockUtils.writeUnlockTables(tableList);
}
@@ -313,7 +327,7 @@ public void commitTransaction2PC(Database db, List tableList, long transa
}
stopWatch.stop();
LOG.info("stream load tasks are committed successfully. txns: {}. time cost: {} ms."
- + " data will be visable later.", transactionId, stopWatch.getTime());
+ + " data will be visible later.", transactionId, stopWatch.getTime());
}
public void abortTransaction(long dbId, long transactionId, String reason) throws UserException {
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java
index 18f29ac30d2110..bf1b82019eb7c8 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java
@@ -358,7 +358,8 @@ public Collection getTabletDeleteInfo() {
new Expectations(globalTransactionMgr) {
{
try {
- globalTransactionMgr.commitTransaction(anyLong, (List) any, anyLong, (List) any, (TxnCommitAttachment) any);
+ globalTransactionMgr.commitTransactionWithoutLock(
+ anyLong, (List) any, anyLong, (List) any, (TxnCommitAttachment) any);
} catch (UserException e) {
// CHECKSTYLE IGNORE THIS LINE
}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java
index e3916cfb18d791..f1e9942c7e311a 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java
@@ -377,7 +377,8 @@ public void testUpdateEtlStatusFinishedAndCommitTransaction(@Mocked Env env, @In
AgentTaskExecutor.submit((AgentBatchTask) any);
Env.getCurrentGlobalTransactionMgr();
result = transactionMgr;
- transactionMgr.commitTransaction(dbId, (List) any, transactionId, (List) any,
+ transactionMgr.commitTransactionWithoutLock(
+ dbId, (List) any, transactionId, (List) any,
(LoadJobFinalOperation) any);
}
};
diff --git a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
index 5f182ef50b7aed..76ae778153d956 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
@@ -111,7 +111,8 @@ public Map addTransactionToTransactionMgr() throws UserException {
transTablets.add(tabletCommitInfo3);
Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId1,
+ masterTransMgr.commitTransactionWithoutLock(
+ CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId1,
transTablets, null);
TransactionState transactionState1 = fakeEditLog.getTransaction(transactionId1);
setTransactionFinishPublish(transactionState1,
diff --git a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
index 7910df48be2eb9..2779a5d5107792 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
@@ -172,8 +172,8 @@ public void testCommitTransaction1() throws UserException {
transTablets.add(tabletCommitInfo3);
Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId,
- transTablets, null);
+ masterTransMgr.commitTransactionWithoutLock(
+ CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId, transTablets, null);
TransactionState transactionState = fakeEditLog.getTransaction(transactionId);
// check status is committed
Assert.assertEquals(TransactionStatus.COMMITTED, transactionState.getTransactionStatus());
@@ -213,7 +213,8 @@ public void testCommitTransactionWithOneFailed() throws UserException {
transTablets.add(tabletCommitInfo2);
Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId,
+ masterTransMgr.commitTransactionWithoutLock(
+ CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId,
transTablets, null);
// follower catalog replay the transaction
@@ -235,7 +236,8 @@ public void testCommitTransactionWithOneFailed() throws UserException {
transTablets.add(tabletCommitInfo1);
transTablets.add(tabletCommitInfo3);
try {
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2,
+ masterTransMgr.commitTransactionWithoutLock(
+ CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2,
transTablets, null);
Assert.fail();
} catch (TabletQuorumFailedException e) {
@@ -265,7 +267,8 @@ public void testCommitTransactionWithOneFailed() throws UserException {
transTablets.add(tabletCommitInfo1);
transTablets.add(tabletCommitInfo2);
transTablets.add(tabletCommitInfo3);
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2,
+ masterTransMgr.commitTransactionWithoutLock(
+ CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2,
transTablets, null);
transactionState = fakeEditLog.getTransaction(transactionId2);
// check status is commit
@@ -366,7 +369,8 @@ public void testCommitRoutineLoadTransaction(@Injectable TabletCommitInfo tablet
Deencapsulation.setField(masterTransMgr.getDatabaseTransactionMgr(CatalogTestUtil.testDbId1), "idToRunningTransactionState", idToTransactionState);
Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
- masterTransMgr.commitTransaction(1L, Lists.newArrayList(testTable1), 1L, transTablets, txnCommitAttachment);
+ masterTransMgr.commitTransactionWithoutLock(
+ 1L, Lists.newArrayList(testTable1), 1L, transTablets, txnCommitAttachment);
RoutineLoadStatistic jobStatistic = Deencapsulation.getField(routineLoadJob, "jobStatistic");
Assert.assertEquals(Long.valueOf(101), Deencapsulation.getField(jobStatistic, "currentTotalRows"));
@@ -439,7 +443,8 @@ public void testCommitRoutineLoadTransactionWithErrorMax(@Injectable TabletCommi
Deencapsulation.setField(masterTransMgr.getDatabaseTransactionMgr(CatalogTestUtil.testDbId1), "idToRunningTransactionState", idToTransactionState);
Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
- masterTransMgr.commitTransaction(1L, Lists.newArrayList(testTable1), 1L, transTablets, txnCommitAttachment);
+ masterTransMgr.commitTransactionWithoutLock(
+ 1L, Lists.newArrayList(testTable1), 1L, transTablets, txnCommitAttachment);
// current total rows and error rows will be reset after job pause, so here they should be 0.
RoutineLoadStatistic jobStatistic = Deencapsulation.getField(routineLoadJob, "jobStatistic");
@@ -471,7 +476,8 @@ public void testFinishTransaction() throws UserException {
transTablets.add(tabletCommitInfo3);
Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId,
+ masterTransMgr.commitTransactionWithoutLock(
+ CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId,
transTablets, null);
TransactionState transactionState = fakeEditLog.getTransaction(transactionId);
Assert.assertEquals(TransactionStatus.COMMITTED, transactionState.getTransactionStatus());
@@ -537,7 +543,8 @@ public void testFinishTransactionWithOneFailed() throws UserException {
transTablets.add(tabletCommitInfo2);
Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId,
+ masterTransMgr.commitTransactionWithoutLock(
+ CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId,
transTablets, null);
// follower catalog replay the transaction
@@ -607,7 +614,8 @@ public void testFinishTransactionWithOneFailed() throws UserException {
transTablets.add(tabletCommitInfo1);
transTablets.add(tabletCommitInfo3);
try {
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2,
+ masterTransMgr.commitTransactionWithoutLock(
+ CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2,
transTablets, null);
Assert.fail();
} catch (TabletQuorumFailedException e) {
@@ -624,7 +632,8 @@ public void testFinishTransactionWithOneFailed() throws UserException {
transTablets.add(tabletCommitInfo1);
transTablets.add(tabletCommitInfo2);
transTablets.add(tabletCommitInfo3);
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2,
+ masterTransMgr.commitTransactionWithoutLock(
+ CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2,
transTablets, null);
transactionState = fakeEditLog.getTransaction(transactionId2);
// check status is commit
diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift
index 3018924064b8a4..210f1ada0a9af3 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -824,6 +824,8 @@ struct TCommitTxnRequest {
10: optional i64 thrift_rpc_timeout_ms
11: optional string token
12: optional i64 db_id
+ // used for ccr
+ 15: optional bool only_commit // only commit txn, without waiting txn publish
}
struct TCommitTxnResult {