Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ private void onLoadingTaskFinished(BrokerLoadingTaskAttachment attachment) {
.add("txn_id", transactionId)
.add("msg", "Load job try to commit txn")
.build());
Env.getCurrentGlobalTransactionMgr().commitTransaction(
Env.getCurrentGlobalTransactionMgr().commitTransactionWithoutLock(
dbId, tableList, transactionId, commitInfos,
new LoadJobFinalOperation(id, loadingStatus, progress, loadStartTimestamp,
finishTimestamp, state, failMsg));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,7 @@ private void tryCommitJob() throws UserException {
Lists.newArrayList(tableToLoadPartitions.keySet()));
MetaLockUtils.writeLockTablesOrMetaException(tableList);
try {
Env.getCurrentGlobalTransactionMgr().commitTransaction(
Env.getCurrentGlobalTransactionMgr().commitTransactionWithoutLock(
dbId, tableList, transactionId, commitInfos,
new LoadJobFinalOperation(id, loadingStatus, progress, loadStartTimestamp,
finishTimestamp, state, failMsg));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1747,11 +1747,21 @@ private boolean commitTxnImpl(TCommitTxnRequest request) throws UserException {
long timeoutMs = request.isSetThriftRpcTimeoutMs() ? request.getThriftRpcTimeoutMs() / 2 : 5000;

// Step 5: commit and publish
return Env.getCurrentGlobalTransactionMgr()
.commitAndPublishTransaction(db, tableList,
request.getTxnId(),
TabletCommitInfo.fromThrift(request.getCommitInfos()), timeoutMs,
TxnCommitAttachment.fromThrift(request.getTxnCommitAttachment()));
if (!request.isOnlyCommit()) {
return Env.getCurrentGlobalTransactionMgr()
.commitAndPublishTransaction(db, tableList,
request.getTxnId(),
TabletCommitInfo.fromThrift(request.getCommitInfos()), timeoutMs,
TxnCommitAttachment.fromThrift(request.getTxnCommitAttachment()));
} else {
// single table commit, so don't need to wait for publish.
Env.getCurrentGlobalTransactionMgr()
.commitTransaction(db, tableList,
request.getTxnId(),
TabletCommitInfo.fromThrift(request.getCommitInfos()), timeoutMs,
TxnCommitAttachment.fromThrift(request.getTxnCommitAttachment()));
return true;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,10 +226,10 @@ private void preCommitTransaction2PC(long dbId, List<Table> tableList, long tran
}

@Deprecated
public void commitTransaction(long dbId, List<Table> tableList,
public void commitTransactionWithoutLock(long dbId, List<Table> tableList,
long transactionId, List<TabletCommitInfo> tabletCommitInfos)
throws UserException {
commitTransaction(dbId, tableList, transactionId, tabletCommitInfos, null);
commitTransactionWithoutLock(dbId, tableList, transactionId, tabletCommitInfos, null);
}

/**
Expand All @@ -241,7 +241,7 @@ public void commitTransaction(long dbId, List<Table> tableList,
* @note it is necessary to optimize the `lock` mechanism and `lock` scope resulting from wait lock long time
* @note callers should get all tables' write locks before call this api
*/
public void commitTransaction(long dbId, List<Table> tableList, long transactionId,
public void commitTransactionWithoutLock(long dbId, List<Table> tableList, long transactionId,
List<TabletCommitInfo> tabletCommitInfos, TxnCommitAttachment txnCommitAttachment)
throws UserException {
if (Config.disable_load_job) {
Expand All @@ -255,6 +255,20 @@ public void commitTransaction(long dbId, List<Table> tableList, long transaction
dbTransactionMgr.commitTransaction(tableList, transactionId, tabletCommitInfos, txnCommitAttachment, false);
}

public void commitTransaction(DatabaseIf db, List<Table> tableList, long transactionId,
List<TabletCommitInfo> tabletCommitInfos, long timeoutMillis, TxnCommitAttachment txnCommitAttachment)
throws UserException {
if (!MetaLockUtils.tryWriteLockTablesOrMetaException(tableList, timeoutMillis, TimeUnit.MILLISECONDS)) {
throw new UserException("get tableList write lock timeout, tableList=("
+ StringUtils.join(tableList, ",") + ")");
}
try {
commitTransactionWithoutLock(db.getId(), tableList, transactionId, tabletCommitInfos, txnCommitAttachment);
} finally {
MetaLockUtils.writeUnlockTables(tableList);
}
}

private void commitTransaction2PC(long dbId, long transactionId)
throws UserException {
if (Config.disable_load_job) {
Expand Down Expand Up @@ -282,7 +296,7 @@ public boolean commitAndPublishTransaction(DatabaseIf db, List<Table> tableList,
+ StringUtils.join(tableList, ",") + ")");
}
try {
commitTransaction(db.getId(), tableList, transactionId, tabletCommitInfos, txnCommitAttachment);
commitTransactionWithoutLock(db.getId(), tableList, transactionId, tabletCommitInfos, txnCommitAttachment);
} finally {
MetaLockUtils.writeUnlockTables(tableList);
}
Expand Down Expand Up @@ -313,7 +327,7 @@ public void commitTransaction2PC(Database db, List<Table> tableList, long transa
}
stopWatch.stop();
LOG.info("stream load tasks are committed successfully. txns: {}. time cost: {} ms."
+ " data will be visable later.", transactionId, stopWatch.getTime());
+ " data will be visible later.", transactionId, stopWatch.getTime());
}

public void abortTransaction(long dbId, long transactionId, String reason) throws UserException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,8 @@ public Collection<TabletDeleteInfo> getTabletDeleteInfo() {
new Expectations(globalTransactionMgr) {
{
try {
globalTransactionMgr.commitTransaction(anyLong, (List<Table>) any, anyLong, (List<TabletCommitInfo>) any, (TxnCommitAttachment) any);
globalTransactionMgr.commitTransactionWithoutLock(
anyLong, (List<Table>) any, anyLong, (List<TabletCommitInfo>) any, (TxnCommitAttachment) any);
} catch (UserException e) {
// CHECKSTYLE IGNORE THIS LINE
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,8 @@ public void testUpdateEtlStatusFinishedAndCommitTransaction(@Mocked Env env, @In
AgentTaskExecutor.submit((AgentBatchTask) any);
Env.getCurrentGlobalTransactionMgr();
result = transactionMgr;
transactionMgr.commitTransaction(dbId, (List<Table>) any, transactionId, (List<TabletCommitInfo>) any,
transactionMgr.commitTransactionWithoutLock(
dbId, (List<Table>) any, transactionId, (List<TabletCommitInfo>) any,
(LoadJobFinalOperation) any);
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ public Map<String, Long> addTransactionToTransactionMgr() throws UserException {
transTablets.add(tabletCommitInfo3);
Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId1,
masterTransMgr.commitTransactionWithoutLock(
CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId1,
transTablets, null);
TransactionState transactionState1 = fakeEditLog.getTransaction(transactionId1);
setTransactionFinishPublish(transactionState1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,8 @@ public void testCommitTransaction1() throws UserException {
transTablets.add(tabletCommitInfo3);
Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId,
transTablets, null);
masterTransMgr.commitTransactionWithoutLock(
CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId, transTablets, null);
TransactionState transactionState = fakeEditLog.getTransaction(transactionId);
// check status is committed
Assert.assertEquals(TransactionStatus.COMMITTED, transactionState.getTransactionStatus());
Expand Down Expand Up @@ -213,7 +213,8 @@ public void testCommitTransactionWithOneFailed() throws UserException {
transTablets.add(tabletCommitInfo2);
Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId,
masterTransMgr.commitTransactionWithoutLock(
CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId,
transTablets, null);

// follower catalog replay the transaction
Expand All @@ -235,7 +236,8 @@ public void testCommitTransactionWithOneFailed() throws UserException {
transTablets.add(tabletCommitInfo1);
transTablets.add(tabletCommitInfo3);
try {
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2,
masterTransMgr.commitTransactionWithoutLock(
CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2,
transTablets, null);
Assert.fail();
} catch (TabletQuorumFailedException e) {
Expand Down Expand Up @@ -265,7 +267,8 @@ public void testCommitTransactionWithOneFailed() throws UserException {
transTablets.add(tabletCommitInfo1);
transTablets.add(tabletCommitInfo2);
transTablets.add(tabletCommitInfo3);
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2,
masterTransMgr.commitTransactionWithoutLock(
CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2,
transTablets, null);
transactionState = fakeEditLog.getTransaction(transactionId2);
// check status is commit
Expand Down Expand Up @@ -366,7 +369,8 @@ public void testCommitRoutineLoadTransaction(@Injectable TabletCommitInfo tablet
Deencapsulation.setField(masterTransMgr.getDatabaseTransactionMgr(CatalogTestUtil.testDbId1), "idToRunningTransactionState", idToTransactionState);
Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
masterTransMgr.commitTransaction(1L, Lists.newArrayList(testTable1), 1L, transTablets, txnCommitAttachment);
masterTransMgr.commitTransactionWithoutLock(
1L, Lists.newArrayList(testTable1), 1L, transTablets, txnCommitAttachment);
RoutineLoadStatistic jobStatistic = Deencapsulation.getField(routineLoadJob, "jobStatistic");

Assert.assertEquals(Long.valueOf(101), Deencapsulation.getField(jobStatistic, "currentTotalRows"));
Expand Down Expand Up @@ -439,7 +443,8 @@ public void testCommitRoutineLoadTransactionWithErrorMax(@Injectable TabletCommi
Deencapsulation.setField(masterTransMgr.getDatabaseTransactionMgr(CatalogTestUtil.testDbId1), "idToRunningTransactionState", idToTransactionState);
Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
masterTransMgr.commitTransaction(1L, Lists.newArrayList(testTable1), 1L, transTablets, txnCommitAttachment);
masterTransMgr.commitTransactionWithoutLock(
1L, Lists.newArrayList(testTable1), 1L, transTablets, txnCommitAttachment);

// current total rows and error rows will be reset after job pause, so here they should be 0.
RoutineLoadStatistic jobStatistic = Deencapsulation.getField(routineLoadJob, "jobStatistic");
Expand Down Expand Up @@ -471,7 +476,8 @@ public void testFinishTransaction() throws UserException {
transTablets.add(tabletCommitInfo3);
Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId,
masterTransMgr.commitTransactionWithoutLock(
CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId,
transTablets, null);
TransactionState transactionState = fakeEditLog.getTransaction(transactionId);
Assert.assertEquals(TransactionStatus.COMMITTED, transactionState.getTransactionStatus());
Expand Down Expand Up @@ -537,7 +543,8 @@ public void testFinishTransactionWithOneFailed() throws UserException {
transTablets.add(tabletCommitInfo2);
Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId,
masterTransMgr.commitTransactionWithoutLock(
CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId,
transTablets, null);

// follower catalog replay the transaction
Expand Down Expand Up @@ -607,7 +614,8 @@ public void testFinishTransactionWithOneFailed() throws UserException {
transTablets.add(tabletCommitInfo1);
transTablets.add(tabletCommitInfo3);
try {
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2,
masterTransMgr.commitTransactionWithoutLock(
CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2,
transTablets, null);
Assert.fail();
} catch (TabletQuorumFailedException e) {
Expand All @@ -624,7 +632,8 @@ public void testFinishTransactionWithOneFailed() throws UserException {
transTablets.add(tabletCommitInfo1);
transTablets.add(tabletCommitInfo2);
transTablets.add(tabletCommitInfo3);
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2,
masterTransMgr.commitTransactionWithoutLock(
CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2,
transTablets, null);
transactionState = fakeEditLog.getTransaction(transactionId2);
// check status is commit
Expand Down
2 changes: 2 additions & 0 deletions gensrc/thrift/FrontendService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -824,6 +824,8 @@ struct TCommitTxnRequest {
10: optional i64 thrift_rpc_timeout_ms
11: optional string token
12: optional i64 db_id
// used for ccr
15: optional bool only_commit // only commit txn, without waiting txn publish
}

struct TCommitTxnResult {
Expand Down
Loading