Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,23 @@ private void commitTransaction(long dbId, List<Table> tableList, long transactio

List<OlapTable> mowTableList = getMowTableList(tableList, tabletCommitInfos);
if (!mowTableList.isEmpty()) {
// may be this txn has been calculated by previously task but commit rpc is timeout,
// and be will send another commit request to fe, so need to check txn status first
// before sending delete bitmap task to be, if txn is committed or visible, no need to
// calculate delete bitmap again, just return ok to be to finish this commit.
TransactionState transactionState = Env.getCurrentGlobalTransactionMgr()
.getTransactionState(dbId, transactionId);
if (null != transactionState && null != transactionState.getTransactionStatus()) {
if (transactionState.getTransactionStatus() == TransactionStatus.COMMITTED
|| transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) {
LOG.info("txn={}, status={} not need to calculate delete bitmap again, just return ", transactionId,
transactionState.getTransactionStatus().toString());
return;
} else {
LOG.info("txn={}, status={} need to calculate delete bitmap", transactionId,
transactionState.getTransactionStatus().toString());
}
}
calcDeleteBitmapForMow(dbId, mowTableList, transactionId, tabletCommitInfos);
}

Expand Down Expand Up @@ -519,6 +536,10 @@ private void commitTransaction(long dbId, List<Table> tableList, long transactio
try {
txnState = commitTxn(commitTxnRequest, transactionId, is2PC, dbId, tableList);
txnOperated = true;
if (DebugPointUtil.isEnable("CloudGlobalTransactionMgr.commitTransaction.timeout")) {
throw new UserException(InternalErrorCode.DELETE_BITMAP_LOCK_ERR,
"test delete bitmap update lock timeout, transactionId:" + transactionId);
}
} finally {
if (txnCommitAttachment != null && txnCommitAttachment instanceof RLTaskTxnCommitAttachment) {
RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = (RLTaskTxnCommitAttachment) txnCommitAttachment;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,7 @@
5 e 90
6 f 100

-- !sql --
5 e 90
6 f 100

Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,13 @@ suite("test_cloud_mow_stream_load_with_timeout", "nonConcurrent") {

// store the original value
get_be_param("mow_stream_load_commit_retry_times")
// disable retry to make this problem more clear
set_be_param("mow_stream_load_commit_retry_times", "1")


def tableName = "tbl_basic"
// test fe release lock when calculating delete bitmap timeout
setFeConfigTemporary(customFeConfig) {
try {
// disable retry to make this problem more clear
set_be_param("mow_stream_load_commit_retry_times", "1")
// create table
sql """ drop table if exists ${tableName}; """

Expand Down Expand Up @@ -143,4 +143,47 @@ suite("test_cloud_mow_stream_load_with_timeout", "nonConcurrent") {
}

}

//test fe don't send calculating delete bitmap task to be twice when txn is committed or visible
GetDebugPoint().clearDebugPointsForAllFEs()
GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.commitTransaction.timeout")
try {
// create table
sql """ drop table if exists ${tableName}; """

sql """
CREATE TABLE `${tableName}` (
`id` int(11) NOT NULL,
`name` varchar(1100) NULL,
`score` int(11) NULL default "-1"
) ENGINE=OLAP
UNIQUE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"enable_unique_key_merge_on_write" = "true",
"replication_num" = "1"
);
"""
streamLoad {
table "${tableName}"

set 'column_separator', ','
set 'columns', 'id, name, score'
file "test_stream_load.csv"

time 10000 // limit inflight 10s

check { result, exception, startTime, endTime ->
log.info("Stream load result: ${result}")
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
}
}
qt_sql """ select * from ${tableName} order by id"""
} finally {
GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.commitTransaction.timeout")
sql "DROP TABLE IF EXISTS ${tableName};"
GetDebugPoint().clearDebugPointsForAllFEs()
}

}