Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -338,14 +338,14 @@ public void preCommitTransaction2PC(Database db, List<Table> tableList, long tra

@Deprecated
@Override
public void commitTransaction(long dbId, List<Table> tableList,
public void commitTransactionWithoutLock(long dbId, List<Table> tableList,
long transactionId, List<TabletCommitInfo> tabletCommitInfos)
throws UserException {
commitTransaction(dbId, tableList, transactionId, tabletCommitInfos, null);
commitTransactionWithoutLock(dbId, tableList, transactionId, tabletCommitInfos, null);
}

@Override
public void commitTransaction(long dbId, List<Table> tableList, long transactionId,
public void commitTransactionWithoutLock(long dbId, List<Table> tableList, long transactionId,
List<TabletCommitInfo> tabletCommitInfos, TxnCommitAttachment txnCommitAttachment)
throws UserException {
List<OlapTable> mowTableList = getMowTableList(tableList, tabletCommitInfos);
Expand Down Expand Up @@ -1186,6 +1186,15 @@ private List<Table> getTablesNeedCommitLock(List<Table> tableList) {
}
}

@Override
public void commitTransaction(DatabaseIf db, List<Table> tableList, long transactionId,
List<TabletCommitInfo> tabletCommitInfos, long timeoutMillis, TxnCommitAttachment txnCommitAttachment)
throws UserException {
// There is no publish in cloud mode
commitAndPublishTransaction(
db, tableList, transactionId, tabletCommitInfos, timeoutMillis, txnCommitAttachment);
}

@Override
public boolean commitAndPublishTransaction(DatabaseIf db, List<Table> tableList, long transactionId,
List<TabletCommitInfo> tabletCommitInfos, long timeoutMillis,
Expand Down Expand Up @@ -1234,7 +1243,7 @@ public boolean commitAndPublishTransaction(DatabaseIf db, List<Table> tableList,
}

try {
commitTransaction(db.getId(), tableList, transactionId, tabletCommitInfos, txnCommitAttachment);
commitTransactionWithoutLock(db.getId(), tableList, transactionId, tabletCommitInfos, txnCommitAttachment);
} finally {
decreaseWaitingLockCount(tablesToLock);
MetaLockUtils.commitUnlockTables(tablesToLock);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ private void onLoadingTaskFinished(BrokerLoadingTaskAttachment attachment) {
.add("txn_id", transactionId)
.add("msg", "Load job try to commit txn")
.build());
Env.getCurrentGlobalTransactionMgr().commitTransaction(
Env.getCurrentGlobalTransactionMgr().commitTransactionWithoutLock(
dbId, tableList, transactionId, commitInfos, getLoadJobFinalOperation());
afterLoadingTaskCommitTransaction(tableList);
afterCommit();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -671,7 +671,7 @@ private void tryCommitJob() throws UserException {
try {
LOG.info(new LogBuilder(LogKey.LOAD_JOB, id).add("txn_id", transactionId)
.add("msg", "Load job try to commit txn").build());
Env.getCurrentGlobalTransactionMgr().commitTransaction(
Env.getCurrentGlobalTransactionMgr().commitTransactionWithoutLock(
dbId, tableList, transactionId, commitInfos,
new LoadJobFinalOperation(id, loadingStatus, progress, loadStartTimestamp,
finishTimestamp, state, failMsg));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1854,14 +1854,23 @@ private boolean commitTxnImpl(TCommitTxnRequest request) throws UserException {
subTxnInfo.getTabletCommitInfos(), null));
}
transactionState.setSubTxnIds(subTxnIds);
return Env.getCurrentGlobalTransactionMgr().commitAndPublishTransaction(db, request.getTxnId(),
subTransactionStates, timeoutMs);
} else {
return Env.getCurrentGlobalTransactionMgr()
.commitAndPublishTransaction(db, request.getTxnId(),
subTransactionStates, timeoutMs);
} else if (!request.isOnlyCommit()) {
return Env.getCurrentGlobalTransactionMgr()
.commitAndPublishTransaction(db, tableList,
request.getTxnId(),
TabletCommitInfo.fromThrift(request.getCommitInfos()), timeoutMs,
TxnCommitAttachment.fromThrift(request.getTxnCommitAttachment()));
} else {
// single table commit, so don't need to wait for publish.
Env.getCurrentGlobalTransactionMgr()
.commitTransaction(db, tableList,
request.getTxnId(),
TabletCommitInfo.fromThrift(request.getCommitInfos()), timeoutMs,
TxnCommitAttachment.fromThrift(request.getTxnCommitAttachment()));
return true;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,10 +215,11 @@ private void preCommitTransaction2PC(long dbId, List<Table> tableList, long tran
}

@Deprecated
public void commitTransaction(long dbId, List<Table> tableList,
@Override
public void commitTransactionWithoutLock(long dbId, List<Table> tableList,
long transactionId, List<TabletCommitInfo> tabletCommitInfos)
throws UserException {
commitTransaction(dbId, tableList, transactionId, tabletCommitInfos, null);
commitTransactionWithoutLock(dbId, tableList, transactionId, tabletCommitInfos, null);
}

@Override
Expand All @@ -234,7 +235,8 @@ public void afterCommitTxnResp(CommitTxnResponse commitTxnResponse) {
* @note it is necessary to optimize the `lock` mechanism and `lock` scope resulting from wait lock long time
* @note callers should get all tables' write locks before call this api
*/
public void commitTransaction(long dbId, List<Table> tableList, long transactionId,
@Override
public void commitTransactionWithoutLock(long dbId, List<Table> tableList, long transactionId,
List<TabletCommitInfo> tabletCommitInfos, TxnCommitAttachment txnCommitAttachment)
throws UserException {
if (Config.disable_load_job) {
Expand All @@ -251,7 +253,7 @@ public void commitTransaction(long dbId, List<Table> tableList, long transaction
/**
* @note callers should get all tables' write locks before call this api
*/
public void commitTransaction(long dbId, List<Table> tableList, long transactionId,
public void commitTransactionWithoutLock(long dbId, List<Table> tableList, long transactionId,
List<SubTransactionState> subTransactionStates, long timeout) throws UserException {
if (Config.disable_load_job) {
throw new TransactionCommitFailedException("disable_load_job is set to true, all load jobs are prevented");
Expand All @@ -264,6 +266,21 @@ public void commitTransaction(long dbId, List<Table> tableList, long transaction
dbTransactionMgr.commitTransaction(transactionId, tableList, subTransactionStates);
}

@Override
public void commitTransaction(DatabaseIf db, List<Table> tableList, long transactionId,
List<TabletCommitInfo> tabletCommitInfos, long timeoutMillis, TxnCommitAttachment txnCommitAttachment)
throws UserException {
if (!MetaLockUtils.tryWriteLockTablesOrMetaException(tableList, timeoutMillis, TimeUnit.MILLISECONDS)) {
throw new UserException("get tableList write lock timeout, tableList=("
+ StringUtils.join(tableList, ",") + ")");
}
try {
commitTransactionWithoutLock(db.getId(), tableList, transactionId, tabletCommitInfos, txnCommitAttachment);
} finally {
MetaLockUtils.writeUnlockTables(tableList);
}
}

private void commitTransaction2PC(long dbId, long transactionId)
throws UserException {
if (Config.disable_load_job) {
Expand Down Expand Up @@ -291,7 +308,7 @@ public boolean commitAndPublishTransaction(DatabaseIf db, List<Table> tableList,
+ StringUtils.join(tableList, ",") + ")");
}
try {
commitTransaction(db.getId(), tableList, transactionId, tabletCommitInfos, txnCommitAttachment);
commitTransactionWithoutLock(db.getId(), tableList, transactionId, tabletCommitInfos, txnCommitAttachment);
} finally {
MetaLockUtils.writeUnlockTables(tableList);
}
Expand Down Expand Up @@ -320,7 +337,7 @@ public boolean commitAndPublishTransaction(DatabaseIf db, long transactionId,
+ StringUtils.join(tableList, ",") + ")");
}
try {
commitTransaction(db.getId(), tableList, transactionId, subTransactionStates, timeoutMillis);
commitTransactionWithoutLock(db.getId(), tableList, transactionId, subTransactionStates, timeoutMillis);
} finally {
MetaLockUtils.writeUnlockTables(tableList);
}
Expand Down Expand Up @@ -351,7 +368,7 @@ public void commitTransaction2PC(Database db, List<Table> tableList, long transa
}
stopWatch.stop();
LOG.info("stream load tasks are committed successfully. txns: {}. time cost: {} ms."
+ " data will be visable later.", transactionId, stopWatch.getTime());
+ " data will be visible later.", transactionId, stopWatch.getTime());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,19 @@ public void preCommitTransaction2PC(Database db, List<Table> tableList, long tra
throws UserException;

@Deprecated
public void commitTransaction(long dbId, List<Table> tableList,
public void commitTransactionWithoutLock(long dbId, List<Table> tableList,
long transactionId, List<TabletCommitInfo> tabletCommitInfos)
throws UserException;

public void commitTransaction(long dbId, List<Table> tableList, long transactionId,
public void commitTransactionWithoutLock(long dbId, List<Table> tableList, long transactionId,
List<TabletCommitInfo> tabletCommitInfos, TxnCommitAttachment txnCommitAttachment)
throws UserException;

public void commitTransaction(DatabaseIf db, List<Table> tableList, long transactionId,
List<TabletCommitInfo> tabletCommitInfos, long timeoutMillis,
TxnCommitAttachment txnCommitAttachment)
throws UserException;

public boolean commitAndPublishTransaction(DatabaseIf db, List<Table> tableList, long transactionId,
List<TabletCommitInfo> tabletCommitInfos, long timeoutMillis)
throws UserException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ public Cloud.CommitTxnResponse commitTxn(Cloud.CommitTxnRequest request) {
long transactionId = 123533;
Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1),
masterTransMgr.commitTransactionWithoutLock(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1),
transactionId, null, null);
}

Expand All @@ -219,7 +219,7 @@ public Cloud.CommitTxnResponse commitTxn(Cloud.CommitTxnRequest request) {
long transactionId = 123533;
Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1),
masterTransMgr.commitTransactionWithoutLock(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1),
transactionId, null, null);
}

Expand All @@ -246,8 +246,8 @@ public Cloud.CommitTxnResponse commitTxn(Cloud.CommitTxnRequest request) {
long transactionId = 123533;
Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1),
transactionId, null, null);
masterTransMgr.commitTransactionWithoutLock(
CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId, null, null);
});
}

Expand Down Expand Up @@ -278,7 +278,7 @@ public Cloud.CommitTxnResponse commitTxn(Cloud.CommitTxnRequest request) {
long transactionId = 123533;
Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1),
masterTransMgr.commitTransactionWithoutLock(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1),
transactionId, null, null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,8 @@ public Collection<TabletDeleteInfo> getTabletDeleteInfo() {
new Expectations(globalTransactionMgr) {
{
try {
globalTransactionMgr.commitTransaction(anyLong, (List<Table>) any, anyLong, (List<TabletCommitInfo>) any, (TxnCommitAttachment) any);
globalTransactionMgr.commitTransactionWithoutLock(
anyLong, (List<Table>) any, anyLong, (List<TabletCommitInfo>) any, (TxnCommitAttachment) any);
} catch (UserException e) {
// CHECKSTYLE IGNORE THIS LINE
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,8 @@ public void testUpdateEtlStatusFinishedAndCommitTransaction(@Mocked Env env, @In
AgentTaskExecutor.submit((AgentBatchTask) any);
Env.getCurrentGlobalTransactionMgr();
result = transactionMgr;
transactionMgr.commitTransaction(dbId, (List<Table>) any, transactionId, (List<TabletCommitInfo>) any,
transactionMgr.commitTransactionWithoutLock(
dbId, (List<Table>) any, transactionId, (List<TabletCommitInfo>) any,
(LoadJobFinalOperation) any);
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ public Map<String, Long> addTransactionToTransactionMgr() throws UserException {
CatalogTestUtil.testTabletId1, allBackends);
Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId1,
masterTransMgr.commitTransactionWithoutLock(
CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId1,
transTablets, null);
TransactionState transactionState1 = fakeEditLog.getTransaction(transactionId1);
Map<String, Map<Long, Long>> keyToSuccessTablets = new HashMap<>();
Expand Down Expand Up @@ -541,7 +542,8 @@ private void addSubTransaction() throws UserException {
setSuccessTablet(keyToSuccessTablets, allBackends, transactionId, CatalogTestUtil.testTabletId1, 14);
setSuccessTablet(keyToSuccessTablets, allBackends, subTxnId2, CatalogTestUtil.testTabletId2, 13);
setSuccessTablet(keyToSuccessTablets, allBackends, subTxnId4, CatalogTestUtil.testTabletId1, 15);
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(table1, table2, table1),
masterTransMgr.commitTransactionWithoutLock(
CatalogTestUtil.testDbId1, Lists.newArrayList(table1, table2, table1),
transactionState6.getTransactionId(),
GlobalTransactionMgrTest.generateSubTransactionStates(masterTransMgr, transactionState6,
subTransactionInfos), 300000);
Expand Down Expand Up @@ -577,7 +579,8 @@ private void addSubTransaction() throws UserException {
new SubTransactionInfo(table1, CatalogTestUtil.testTabletId1, allBackends, subTxnIds8.get(0)),
new SubTransactionInfo(table2, CatalogTestUtil.testTabletId2, allBackends, subTxnIds8.get(1)),
new SubTransactionInfo(table1, CatalogTestUtil.testTabletId1, allBackends, subTxnIds8.get(2)));
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(table1, table2),
masterTransMgr.commitTransactionWithoutLock(
CatalogTestUtil.testDbId1, Lists.newArrayList(table1, table2),
transactionState8.getTransactionId(),
GlobalTransactionMgrTest.generateSubTransactionStates(masterTransMgr, transactionState8,
subTransactionInfos), 300000);
Expand Down
Loading
Loading