Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,9 @@ public void commitTransaction(long dbId, List<Table> tableList, long transaction
tableList.stream().map(Table::getId).collect(Collectors.toList()));
Map<Long, List<TCalcDeleteBitmapPartitionInfo>> backendToPartitionInfos = null;
if (!mowTableList.isEmpty()) {
if (!checkTransactionStateBeforeCommit(dbId, transactionId)) {
return;
}
DeleteBitmapUpdateLockContext lockContext = new DeleteBitmapUpdateLockContext();
getDeleteBitmapUpdateLock(transactionId, mowTableList, tabletCommitInfos, lockContext);
if (lockContext.getBackendToPartitionTablets().isEmpty()) {
Expand All @@ -374,6 +377,40 @@ public void commitTransaction(long dbId, List<Table> tableList, long transaction
}
}

private boolean checkTransactionStateBeforeCommit(long dbId, long transactionId)
throws TransactionCommitFailedException {
// if this txn has been calculated by previously task but commit rpc is timeout,
// be will send another commit request to fe, so if txn is committed or visible,
// no need to calculate delete bitmap again, just return ok to be to finish this commit.
TransactionState transactionState = Env.getCurrentGlobalTransactionMgr()
.getTransactionState(dbId, transactionId);
if (transactionState == null) {
throw new TransactionCommitFailedException("txn does not exist: " + transactionId);
}
if (null != transactionState.getTransactionStatus()) {
if (transactionState.getTransactionStatus() == TransactionStatus.ABORTED) {
throw new TransactionCommitFailedException("transaction [" + transactionId
+ "] is already aborted. abort reason: " + transactionState.getReason());
} else if (transactionState.getTransactionStatus() == TransactionStatus.COMMITTED
|| transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) {
LOG.info("txn={}, status={} not need to calculate delete bitmap again, just return ",
transactionId,
transactionState.getTransactionStatus().toString());
return false;
} else if (transactionState.getTransactionStatus() == TransactionStatus.PREPARE) {
LOG.info("txn={}, status={} need to calculate delete bitmap", transactionId,
transactionState.getTransactionStatus().toString());
return true;
} else {
throw new TransactionCommitFailedException("transaction [" + transactionId
+ "] status is: " + transactionState.getTransactionStatus().toString());
}
} else {
throw new TransactionCommitFailedException("transaction [" + transactionId
+ "] status is null ");
}
}

/**
* Post process of commitTxn
* 1. update some stats
Expand Down Expand Up @@ -499,23 +536,6 @@ private void commitTransactionWithoutLock(long dbId, List<Table> tableList, long
}

if (!mowTableList.isEmpty()) {
// may be this txn has been calculated by previously task but commit rpc is timeout,
// and be will send another commit request to fe, so need to check txn status first
// before sending delete bitmap task to be, if txn is committed or visible, no need to
// calculate delete bitmap again, just return ok to be to finish this commit.
TransactionState transactionState = Env.getCurrentGlobalTransactionMgr()
.getTransactionState(dbId, transactionId);
if (null != transactionState && null != transactionState.getTransactionStatus()) {
if (transactionState.getTransactionStatus() == TransactionStatus.COMMITTED
|| transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) {
LOG.info("txn={}, status={} not need to calculate delete bitmap again, just return ", transactionId,
transactionState.getTransactionStatus().toString());
return;
} else {
LOG.info("txn={}, status={} need to calculate delete bitmap", transactionId,
transactionState.getTransactionStatus().toString());
}
}
sendCalcDeleteBitmaptask(dbId, transactionId, backendToPartitionInfos);
}

Expand Down Expand Up @@ -1162,6 +1182,11 @@ public boolean commitAndPublishTransaction(DatabaseIf db, List<Table> tableList,
public void commitTransaction2PC(Database db, List<Table> tableList, long transactionId, long timeoutMillis)
throws UserException {
List<OlapTable> mowTableList = getMowTableList(tableList, null);
if (!mowTableList.isEmpty()) {
if (!checkTransactionStateBeforeCommit(db.getId(), transactionId)) {
return;
}
}
commitTransactionWithoutLock(db.getId(), tableList, transactionId, null, null, true, mowTableList, null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
// under the License.

suite("test_cloud_mow_stream_load_with_commit_fail", "nonConcurrent") {
if (!isCloudMode()) {
return
}
GetDebugPoint().clearDebugPointsForAllFEs()

def backendId_to_backendIP = [:]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,26 @@ suite("test_cloud_mow_stream_load_with_timeout", "nonConcurrent") {
}
}
qt_sql """ select * from ${tableName} order by id"""
def now = System.currentTimeMillis()
GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.commitTransaction.timeout")
streamLoad {
table "${tableName}"

set 'column_separator', ','
set 'columns', 'id, name, score'
file "test_stream_load.csv"

time 10000 // limit inflight 10s

check { result, exception, startTime, endTime ->
log.info("Stream load result: ${result}")
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())

}
}
def time_diff = System.currentTimeMillis() - now
assertTrue(time_diff < 10000)
} finally {
GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.commitTransaction.timeout")
sql "DROP TABLE IF EXISTS ${tableName};"
Expand Down
Loading