diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
index 11a3f05ead70c4..d6ffafb33aa567 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
@@ -346,7 +346,29 @@ public void commitTransaction(long dbId, List
tableList,
public void commitTransaction(long dbId, List tableList, long transactionId,
List tabletCommitInfos, TxnCommitAttachment txnCommitAttachment)
throws UserException {
- commitTransaction(dbId, tableList, transactionId, tabletCommitInfos, txnCommitAttachment, false);
+ List mowTableList = getMowTableList(tableList, tabletCommitInfos);
+ try {
+ LOG.info("try to commit transaction, transactionId: {}", transactionId);
+ Map> backendToPartitionInfos = null;
+ if (!mowTableList.isEmpty()) {
+ DeleteBitmapUpdateLockContext lockContext = new DeleteBitmapUpdateLockContext();
+ getDeleteBitmapUpdateLock(transactionId, mowTableList, tabletCommitInfos, lockContext);
+ if (lockContext.getBackendToPartitionTablets().isEmpty()) {
+ throw new UserException(
+ "The partition info is empty, table may be dropped, txnid=" + transactionId);
+ }
+ backendToPartitionInfos = getCalcDeleteBitmapInfo(lockContext, null);
+ }
+ commitTransactionWithoutLock(dbId, tableList, transactionId, tabletCommitInfos, txnCommitAttachment, false,
+ mowTableList, backendToPartitionInfos);
+ } catch (Exception e) {
+ if (!mowTableList.isEmpty()) {
+ LOG.warn("commit txn {} failed, release delete bitmap lock, catch exception {}", transactionId,
+ e.getMessage());
+ removeDeleteBitmapUpdateLock(mowTableList, transactionId);
+ }
+ throw e;
+ }
}
/**
@@ -464,17 +486,15 @@ private Set getBaseTabletsFromTables(List tableList, List tableList, long transactionId,
- List tabletCommitInfos, TxnCommitAttachment txnCommitAttachment, boolean is2PC)
+ private void commitTransactionWithoutLock(long dbId, List tableList, long transactionId,
+ List tabletCommitInfos, TxnCommitAttachment txnCommitAttachment, boolean is2PC,
+ List mowTableList, Map> backendToPartitionInfos)
throws UserException {
-
- LOG.info("try to commit transaction, transactionId: {}", transactionId);
if (Config.disable_load_job) {
throw new TransactionCommitFailedException(
"disable_load_job is set to true, all load jobs are not allowed");
}
- List mowTableList = getMowTableList(tableList, tabletCommitInfos);
if (!mowTableList.isEmpty()) {
// may be this txn has been calculated by previously task but commit rpc is timeout,
// and be will send another commit request to fe, so need to check txn status first
@@ -493,7 +513,8 @@ private void commitTransaction(long dbId, List tableList, long transactio
transactionState.getTransactionStatus().toString());
}
}
- calcDeleteBitmapForMow(dbId, mowTableList, transactionId, tabletCommitInfos, null);
+ sendCalcDeleteBitmaptask(dbId, transactionId, backendToPartitionInfos,
+ Config.calculate_delete_bitmap_task_timeout_seconds);
}
CommitTxnRequest.Builder builder = CommitTxnRequest.newBuilder();
@@ -535,6 +556,10 @@ private void commitTransaction(long dbId, List tableList, long transactio
private void executeCommitTxnRequest(CommitTxnRequest commitTxnRequest, long transactionId, boolean is2PC,
TxnCommitAttachment txnCommitAttachment) throws UserException {
+ if (DebugPointUtil.isEnable("FE.mow.commit.exception")) {
+ LOG.info("debug point FE.mow.commit.exception, throw e");
+ throw new UserException("debug point FE.mow.commit.exception");
+ }
boolean txnOperated = false;
TransactionState txnState = null;
TxnStateChangeCallback cb = null;
@@ -653,43 +678,6 @@ private List getMowTableList(List tableList, List tableList, long transactionId,
- List tabletCommitInfos, List subTransactionStates)
- throws UserException {
- Map>> backendToPartitionTablets = Maps.newHashMap();
- Map partitions = Maps.newHashMap();
- Map> tableToPartitions = Maps.newHashMap();
- Map> tableToTabletList = Maps.newHashMap();
- Map tabletToTabletMeta = Maps.newHashMap();
- getPartitionInfo(tableList, tabletCommitInfos, tableToPartitions, partitions, backendToPartitionTablets,
- tableToTabletList, tabletToTabletMeta);
- if (backendToPartitionTablets.isEmpty()) {
- throw new UserException("The partition info is empty, table may be dropped, txnid=" + transactionId);
- }
-
- Map> partitionToSubTxnIds = getPartitionSubTxnIds(subTransactionStates, tableToTabletList,
- tabletToTabletMeta);
- Map baseCompactionCnts = Maps.newHashMap();
- Map cumulativeCompactionCnts = Maps.newHashMap();
- Map cumulativePoints = Maps.newHashMap();
- getDeleteBitmapUpdateLock(tableToPartitions, transactionId, tableToTabletList, tabletToTabletMeta,
- baseCompactionCnts, cumulativeCompactionCnts, cumulativePoints);
- Map partitionVersions = getPartitionVersions(partitions);
-
- Map> backendToPartitionInfos = getCalcDeleteBitmapInfo(
- backendToPartitionTablets, partitionVersions, baseCompactionCnts, cumulativeCompactionCnts,
- cumulativePoints, partitionToSubTxnIds);
- try {
- sendCalcDeleteBitmaptask(dbId, transactionId, backendToPartitionInfos,
- subTransactionStates == null ? Config.calculate_delete_bitmap_task_timeout_seconds
- : Config.calculate_delete_bitmap_task_timeout_seconds_for_transaction_load);
- } catch (UserException e) {
- LOG.warn("failed to sendCalcDeleteBitmaptask for txn=" + transactionId + ",exception=" + e.getMessage());
- removeDeleteBitmapUpdateLock(tableToPartitions, transactionId);
- throw e;
- }
- }
-
private Map> getPartitionSubTxnIds(List subTransactionStates,
Map> tableToTabletList, Map tabletToTabletMeta) {
if (subTransactionStates == null) {
@@ -715,11 +703,7 @@ private Map> getPartitionSubTxnIds(List su
private void getPartitionInfo(List tableList,
List tabletCommitInfos,
- Map> tableToParttions,
- Map partitions,
- Map>> backendToPartitionTablets,
- Map> tableToTabletList,
- Map tabletToTabletMeta) {
+ DeleteBitmapUpdateLockContext lockContext) {
Map tableMap = Maps.newHashMap();
for (OlapTable olapTable : tableList) {
tableMap.put(olapTable.getId(), olapTable);
@@ -731,7 +715,7 @@ private void getPartitionInfo(List tableList,
List tabletMetaList = tabletInvertedIndex.getTabletMetaList(tabletIds);
for (int i = 0; i < tabletMetaList.size(); i++) {
long tabletId = tabletIds.get(i);
- if (tabletToTabletMeta.containsKey(tabletId)) {
+ if (lockContext.getTabletToTabletMeta().containsKey(tabletId)) {
continue;
}
TabletMeta tabletMeta = tabletMetaList.get(i);
@@ -740,9 +724,10 @@ private void getPartitionInfo(List tableList,
continue;
}
- tabletToTabletMeta.put(tabletId, tabletMeta);
+ lockContext.getTabletToTabletMeta().put(tabletId, tabletMeta);
- List tableTabletIds = tableToTabletList.computeIfAbsent(tableId, k -> Lists.newArrayList());
+ List tableTabletIds = lockContext.getTableToTabletList()
+ .computeIfAbsent(tableId, k -> Lists.newArrayList());
if (!tableTabletIds.contains(tabletId)) {
tableTabletIds.add(tabletId);
}
@@ -750,20 +735,20 @@ private void getPartitionInfo(List tableList,
long partitionId = tabletMeta.getPartitionId();
long backendId = tabletCommitInfos.get(i).getBackendId();
- if (!tableToParttions.containsKey(tableId)) {
- tableToParttions.put(tableId, Sets.newHashSet());
+ if (!lockContext.getTableToPartitions().containsKey(tableId)) {
+ lockContext.getTableToPartitions().put(tableId, Sets.newHashSet());
}
- tableToParttions.get(tableId).add(partitionId);
+ lockContext.getTableToPartitions().get(tableId).add(partitionId);
- if (!backendToPartitionTablets.containsKey(backendId)) {
- backendToPartitionTablets.put(backendId, Maps.newHashMap());
+ if (!lockContext.getBackendToPartitionTablets().containsKey(backendId)) {
+ lockContext.getBackendToPartitionTablets().put(backendId, Maps.newHashMap());
}
- Map> partitionToTablets = backendToPartitionTablets.get(backendId);
+ Map> partitionToTablets = lockContext.getBackendToPartitionTablets().get(backendId);
if (!partitionToTablets.containsKey(partitionId)) {
partitionToTablets.put(partitionId, Sets.newHashSet());
}
partitionToTablets.get(partitionId).add(tabletId);
- partitions.putIfAbsent(partitionId, tableMap.get(tableId).getPartition(partitionId));
+ lockContext.getPartitions().putIfAbsent(partitionId, tableMap.get(tableId).getPartition(partitionId));
}
}
@@ -778,11 +763,10 @@ private Map getPartitionVersions(Map partitionMap)
}
private Map> getCalcDeleteBitmapInfo(
- Map>> backendToPartitionTablets, Map partitionVersions,
- Map baseCompactionCnts, Map cumulativeCompactionCnts,
- Map cumulativePoints, Map> partitionToSubTxnIds) {
+ DeleteBitmapUpdateLockContext lockContext, Map> partitionToSubTxnIds) {
Map> backendToPartitionInfos = Maps.newHashMap();
- for (Map.Entry>> entry : backendToPartitionTablets.entrySet()) {
+ Map partitionVersions = getPartitionVersions(lockContext.getPartitions());
+ for (Map.Entry>> entry : lockContext.getBackendToPartitionTablets().entrySet()) {
List partitionInfos = Lists.newArrayList();
for (Map.Entry> partitionToTablets : entry.getValue().entrySet()) {
Long partitionId = partitionToTablets.getKey();
@@ -790,15 +774,16 @@ private Map> getCalcDeleteBitmapInfo(
TCalcDeleteBitmapPartitionInfo partitionInfo = new TCalcDeleteBitmapPartitionInfo(partitionId,
partitionVersions.get(partitionId),
Lists.newArrayList(tabletList));
- if (!baseCompactionCnts.isEmpty() && !cumulativeCompactionCnts.isEmpty()
- && !cumulativePoints.isEmpty()) {
+ if (!lockContext.getBaseCompactionCnts().isEmpty()
+ && !lockContext.getCumulativeCompactionCnts().isEmpty()
+ && !lockContext.getCumulativePoints().isEmpty()) {
List reqBaseCompactionCnts = Lists.newArrayList();
List reqCumulativeCompactionCnts = Lists.newArrayList();
List reqCumulativePoints = Lists.newArrayList();
for (long tabletId : tabletList) {
- reqBaseCompactionCnts.add(baseCompactionCnts.get(tabletId));
- reqCumulativeCompactionCnts.add(cumulativeCompactionCnts.get(tabletId));
- reqCumulativePoints.add(cumulativePoints.get(tabletId));
+ reqBaseCompactionCnts.add(lockContext.getBaseCompactionCnts().get(tabletId));
+ reqCumulativeCompactionCnts.add(lockContext.getCumulativeCompactionCnts().get(tabletId));
+ reqCumulativePoints.add(lockContext.getCumulativePoints().get(tabletId));
}
partitionInfo.setBaseCompactionCnts(reqBaseCompactionCnts);
partitionInfo.setCumulativeCompactionCnts(reqCumulativeCompactionCnts);
@@ -818,10 +803,9 @@ private Map> getCalcDeleteBitmapInfo(
return backendToPartitionInfos;
}
- private void getDeleteBitmapUpdateLock(Map> tableToParttions, long transactionId,
- Map> tableToTabletList, Map tabletToTabletMeta,
- Map baseCompactionCnts, Map cumulativeCompactionCnts,
- Map cumulativePoints) throws UserException {
+ private void getDeleteBitmapUpdateLock(long transactionId, List mowTableList,
+ List tabletCommitInfos, DeleteBitmapUpdateLockContext lockContext)
+ throws UserException {
if (DebugPointUtil.isEnable("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.sleep")) {
DebugPoint debugPoint = DebugPointUtil.getDebugPoint(
"CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.sleep");
@@ -854,17 +838,15 @@ private void getDeleteBitmapUpdateLock(Map> tableToParttions, lo
}
StopWatch stopWatch = new StopWatch();
stopWatch.start();
+ getPartitionInfo(mowTableList, tabletCommitInfos, lockContext);
int totalRetryTime = 0;
- for (Map.Entry> entry : tableToParttions.entrySet()) {
+ for (Map.Entry> entry : lockContext.getTableToPartitions().entrySet()) {
GetDeleteBitmapUpdateLockRequest.Builder builder = GetDeleteBitmapUpdateLockRequest.newBuilder();
- builder.setTableId(entry.getKey())
- .setLockId(transactionId)
- .setInitiator(-1)
- .setExpiration(Config.delete_bitmap_lock_expiration_seconds)
- .setRequireCompactionStats(true);
- List tabletList = tableToTabletList.get(entry.getKey());
+ builder.setTableId(entry.getKey()).setLockId(transactionId).setInitiator(-1)
+ .setExpiration(Config.delete_bitmap_lock_expiration_seconds).setRequireCompactionStats(true);
+ List tabletList = lockContext.getTableToTabletList().get(entry.getKey());
for (Long tabletId : tabletList) {
- TabletMeta tabletMeta = tabletToTabletMeta.get(tabletId);
+ TabletMeta tabletMeta = lockContext.getTabletToTabletMeta().get(tabletId);
TabletIndexPB.Builder tabletIndexBuilder = TabletIndexPB.newBuilder();
tabletIndexBuilder.setDbId(tabletMeta.getDbId());
tabletIndexBuilder.setTableId(tabletMeta.getTableId());
@@ -881,16 +863,16 @@ private void getDeleteBitmapUpdateLock(Map> tableToParttions, lo
try {
response = MetaServiceProxy.getInstance().getDeleteBitmapUpdateLock(request);
if (LOG.isDebugEnabled()) {
- LOG.debug("get delete bitmap lock, transactionId={}, Request: {}, Response: {}",
- transactionId, request, response);
+ LOG.debug("get delete bitmap lock, transactionId={}, Request: {}, Response: {}", transactionId,
+ request, response);
}
if (response.getStatus().getCode() != MetaServiceCode.LOCK_CONFLICT
&& response.getStatus().getCode() != MetaServiceCode.KV_TXN_CONFLICT) {
break;
}
} catch (Exception e) {
- LOG.warn("ignore get delete bitmap lock exception, transactionId={}, retryTime={}",
- transactionId, retryTime, e);
+ LOG.warn("ignore get delete bitmap lock exception, transactionId={}, retryTime={}", transactionId,
+ retryTime, e);
}
// sleep random millis [20, 300] ms, avoid txn conflict
int randomMillis = 20 + (int) (Math.random() * (300 - 20));
@@ -906,8 +888,8 @@ private void getDeleteBitmapUpdateLock(Map> tableToParttions, lo
Preconditions.checkNotNull(response);
Preconditions.checkNotNull(response.getStatus());
if (response.getStatus().getCode() != MetaServiceCode.OK) {
- LOG.warn("get delete bitmap lock failed, transactionId={}, for {} times, response:{}",
- transactionId, retryTime, response);
+ LOG.warn("get delete bitmap lock failed, transactionId={}, for {} times, response:{}", transactionId,
+ retryTime, response);
if (response.getStatus().getCode() == MetaServiceCode.LOCK_CONFLICT
|| response.getStatus().getCode() == MetaServiceCode.KV_TXN_CONFLICT) {
// DELETE_BITMAP_LOCK_ERR will be retried on be
@@ -928,30 +910,28 @@ private void getDeleteBitmapUpdateLock(Map> tableToParttions, lo
if (size1 != tabletList.size() || size2 != tabletList.size() || size3 != tabletList.size()) {
throw new UserException("The size of returned compaction cnts can't match the size of tabletList, "
+ "tabletList.size()=" + tabletList.size() + ", respBaseCompactionCnts.size()=" + size1
- + ", respCumulativeCompactionCnts.size()=" + size2 + ", respCumulativePoints.size()="
- + size3);
+ + ", respCumulativeCompactionCnts.size()=" + size2 + ", respCumulativePoints.size()=" + size3);
}
for (int i = 0; i < tabletList.size(); i++) {
long tabletId = tabletList.get(i);
- baseCompactionCnts.put(tabletId, respBaseCompactionCnts.get(i));
- cumulativeCompactionCnts.put(tabletId, respCumulativeCompactionCnts.get(i));
- cumulativePoints.put(tabletId, respCumulativePoints.get(i));
+ lockContext.getBaseCompactionCnts().put(tabletId, respBaseCompactionCnts.get(i));
+ lockContext.getCumulativeCompactionCnts().put(tabletId, respCumulativeCompactionCnts.get(i));
+ lockContext.getCumulativePoints().put(tabletId, respCumulativePoints.get(i));
}
totalRetryTime += retryTime;
}
stopWatch.stop();
if (totalRetryTime > 0 || stopWatch.getTime() > 20) {
- LOG.info(
- "get delete bitmap lock successfully. txns: {}. totalRetryTime: {}. "
- + "partitionSize: {}. time cost: {} ms.",
- transactionId, totalRetryTime, tableToParttions.size(), stopWatch.getTime());
+ LOG.info("get delete bitmap lock successfully. txns: {}. totalRetryTime: {}. "
+ + "partitionSize: {}. time cost: {} ms.", transactionId, totalRetryTime,
+ lockContext.getTableToPartitions().size(), stopWatch.getTime());
}
}
- private void removeDeleteBitmapUpdateLock(Map> tableToParttions, long transactionId) {
- for (Map.Entry> entry : tableToParttions.entrySet()) {
+ private void removeDeleteBitmapUpdateLock(List tableList, long transactionId) {
+ for (OlapTable table : tableList) {
RemoveDeleteBitmapUpdateLockRequest.Builder builder = RemoveDeleteBitmapUpdateLockRequest.newBuilder();
- builder.setTableId(entry.getKey())
+ builder.setTableId(table.getId())
.setLockId(transactionId)
.setInitiator(-1);
final RemoveDeleteBitmapUpdateLockRequest request = builder.build();
@@ -978,6 +958,10 @@ private void removeDeleteBitmapUpdateLock(Map> tableToParttions,
private void sendCalcDeleteBitmaptask(long dbId, long transactionId,
Map> backendToPartitionInfos,
long calculateDeleteBitmapTaskTimeoutSeconds) throws UserException {
+ if (backendToPartitionInfos == null) {
+ throw new UserException("failed to send calculate delete bitmap task to be,transactionId=" + transactionId
+ + ",but backendToPartitionInfos is null");
+ }
if (backendToPartitionInfos.isEmpty()) {
return;
}
@@ -1100,8 +1084,34 @@ public boolean commitAndPublishTransaction(DatabaseIf db, long transactionId,
.collect(Collectors.toList());
List tableList = ((Database) db).getTablesOnIdOrderOrThrowException(tableIdList);
beforeCommitTransaction(tableList, transactionId, timeoutMillis);
+ List tabletCommitInfos = subTransactionStates.stream().map(
+ SubTransactionState::getTabletCommitInfos).flatMap(Collection::stream)
+ .map(c -> new TabletCommitInfo(c.getTabletId(), c.getBackendId())).collect(Collectors.toList());
+ List mowTableList = getMowTableList(tableList, tabletCommitInfos);
try {
- commitTransactionWithSubTxns(db.getId(), tableList, transactionId, subTransactionStates);
+ Map> backendToPartitionInfos = null;
+ if (!mowTableList.isEmpty()) {
+ DeleteBitmapUpdateLockContext lockContext = new DeleteBitmapUpdateLockContext();
+ getDeleteBitmapUpdateLock(transactionId, mowTableList, tabletCommitInfos, lockContext);
+ if (lockContext.getBackendToPartitionTablets().isEmpty()) {
+ throw new UserException(
+ "The partition info is empty, table may be dropped, txnid=" + transactionId);
+ }
+ Map> partitionToSubTxnIds = getPartitionSubTxnIds(subTransactionStates,
+ lockContext.getTableToTabletList(),
+ lockContext.getTabletToTabletMeta());
+ backendToPartitionInfos = getCalcDeleteBitmapInfo(
+ lockContext, partitionToSubTxnIds);
+ }
+ commitTransactionWithSubTxns(db.getId(), tableList, transactionId, subTransactionStates, mowTableList,
+ backendToPartitionInfos);
+ } catch (Exception e) {
+ if (!mowTableList.isEmpty()) {
+ LOG.warn("commit txn {} failed, release delete bitmap lock, catch exception {}", transactionId,
+ e.getMessage());
+ removeDeleteBitmapUpdateLock(mowTableList, transactionId);
+ }
+ throw e;
} finally {
afterCommitTransaction(tableList);
}
@@ -1109,13 +1119,11 @@ public boolean commitAndPublishTransaction(DatabaseIf db, long transactionId,
}
private void commitTransactionWithSubTxns(long dbId, List tableList, long transactionId,
- List subTransactionStates) throws UserException {
- List tabletCommitInfos = subTransactionStates.stream().map(
- SubTransactionState::getTabletCommitInfos).flatMap(Collection::stream)
- .map(c -> new TabletCommitInfo(c.getTabletId(), c.getBackendId())).collect(Collectors.toList());
- List mowTableList = getMowTableList(tableList, tabletCommitInfos);
+ List subTransactionStates, List mowTableList,
+ Map> backendToPartitionInfos) throws UserException {
if (!mowTableList.isEmpty()) {
- calcDeleteBitmapForMow(dbId, mowTableList, transactionId, tabletCommitInfos, subTransactionStates);
+ sendCalcDeleteBitmaptask(dbId, transactionId, backendToPartitionInfos,
+ Config.calculate_delete_bitmap_task_timeout_seconds_for_transaction_load);
}
cleanSubTransactions(transactionId);
@@ -1196,7 +1204,8 @@ public boolean commitAndPublishTransaction(DatabaseIf db, List tableList,
@Override
public void commitTransaction2PC(Database db, List tableList, long transactionId, long timeoutMillis)
throws UserException {
- commitTransaction(db.getId(), tableList, transactionId, null, null, true);
+ List mowTableList = getMowTableList(tableList, null);
+ commitTransactionWithoutLock(db.getId(), tableList, transactionId, null, null, true, mowTableList, null);
}
@Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/DeleteBitmapUpdateLockContext.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/DeleteBitmapUpdateLockContext.java
new file mode 100644
index 00000000000000..fcc84b9ca18227
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/DeleteBitmapUpdateLockContext.java
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cloud.transaction;
+
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.TabletMeta;
+
+import com.google.common.collect.Maps;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class DeleteBitmapUpdateLockContext {
+ private Map baseCompactionCnts;
+ private Map cumulativeCompactionCnts;
+ private Map cumulativePoints;
+ private Map> tableToPartitions;
+ private Map partitions;
+ private Map>> backendToPartitionTablets;
+ private Map> tableToTabletList;
+ private Map tabletToTabletMeta;
+
+ public DeleteBitmapUpdateLockContext() {
+ baseCompactionCnts = Maps.newHashMap();
+ cumulativeCompactionCnts = Maps.newHashMap();
+ cumulativePoints = Maps.newHashMap();
+ tableToPartitions = Maps.newHashMap();
+ partitions = Maps.newHashMap();
+ backendToPartitionTablets = Maps.newHashMap();
+ tableToTabletList = Maps.newHashMap();
+ tabletToTabletMeta = Maps.newHashMap();
+ }
+
+ public Map> getTableToTabletList() {
+ return tableToTabletList;
+ }
+
+ public Map getBaseCompactionCnts() {
+ return baseCompactionCnts;
+ }
+
+ public Map getCumulativeCompactionCnts() {
+ return cumulativeCompactionCnts;
+ }
+
+ public Map getCumulativePoints() {
+ return cumulativePoints;
+ }
+
+ public Map>> getBackendToPartitionTablets() {
+ return backendToPartitionTablets;
+ }
+
+ public Map getPartitions() {
+ return partitions;
+ }
+
+ public Map> getTableToPartitions() {
+ return tableToPartitions;
+ }
+
+ public Map getTabletToTabletMeta() {
+ return tabletToTabletMeta;
+ }
+
+}
diff --git a/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_commit_fail.out b/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_commit_fail.out
new file mode 100644
index 00000000000000..b8b3ea3eccac14
--- /dev/null
+++ b/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_commit_fail.out
@@ -0,0 +1,7 @@
+-- This file is automatically generated. You should know what you did if you want to edit this
+-- !sql --
+
+-- !sql --
+5 e 90
+6 f 100
+
diff --git a/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_commit_fail.groovy b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_commit_fail.groovy
new file mode 100644
index 00000000000000..fa71c3644f2027
--- /dev/null
+++ b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_commit_fail.groovy
@@ -0,0 +1,142 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_cloud_mow_stream_load_with_commit_fail", "nonConcurrent") {
+ GetDebugPoint().clearDebugPointsForAllFEs()
+
+ def backendId_to_backendIP = [:]
+ def backendId_to_backendHttpPort = [:]
+ def backendId_to_params = [string: [:]]
+ getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
+
+ def set_be_param = { paramName, paramValue ->
+ // for eache be node, set paramName=paramValue
+ for (String id in backendId_to_backendIP.keySet()) {
+ def beIp = backendId_to_backendIP.get(id)
+ def bePort = backendId_to_backendHttpPort.get(id)
+ def (code, out, err) = curl("POST", String.format("http://%s:%s/api/update_config?%s=%s", beIp, bePort, paramName, paramValue))
+ assertTrue(out.contains("OK"))
+ }
+ }
+
+ def reset_be_param = { paramName ->
+ // for eache be node, reset paramName to default
+ for (String id in backendId_to_backendIP.keySet()) {
+ def beIp = backendId_to_backendIP.get(id)
+ def bePort = backendId_to_backendHttpPort.get(id)
+ def original_value = backendId_to_params.get(id).get(paramName)
+ def (code, out, err) = curl("POST", String.format("http://%s:%s/api/update_config?%s=%s", beIp, bePort, paramName, original_value))
+ assertTrue(out.contains("OK"))
+ }
+ }
+
+ def get_be_param = { paramName ->
+ // for eache be node, get param value by default
+ def paramValue = ""
+ for (String id in backendId_to_backendIP.keySet()) {
+ def beIp = backendId_to_backendIP.get(id)
+ def bePort = backendId_to_backendHttpPort.get(id)
+ // get the config value from be
+ def (code, out, err) = curl("GET", String.format("http://%s:%s/api/show_config?conf_item=%s", beIp, bePort, paramName))
+ assertTrue(code == 0)
+ assertTrue(out.contains(paramName))
+ // parsing
+ def resultList = parseJson(out)[0]
+ assertTrue(resultList.size() == 4)
+ // get original value
+ paramValue = resultList[2]
+ backendId_to_params.get(id, [:]).put(paramName, paramValue)
+ }
+ }
+
+ def customFeConfig = [
+ calculate_delete_bitmap_task_timeout_seconds: 2,
+ meta_service_rpc_retry_times : 5
+ ]
+
+ // store the original value
+ get_be_param("mow_stream_load_commit_retry_times")
+ // disable retry to make this problem more clear
+ set_be_param("mow_stream_load_commit_retry_times", "1")
+
+
+ def tableName = "tbl_basic"
+ setFeConfigTemporary(customFeConfig) {
+ try {
+ // create table
+ sql """ drop table if exists ${tableName}; """
+
+ sql """
+ CREATE TABLE `${tableName}` (
+ `id` int(11) NOT NULL,
+ `name` varchar(1100) NULL,
+ `score` int(11) NULL default "-1"
+ ) ENGINE=OLAP
+ UNIQUE KEY(`id`)
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES (
+ "enable_unique_key_merge_on_write" = "true",
+ "replication_num" = "1"
+ );
+ """
+ // this streamLoad will fail on fe commit phase
+ GetDebugPoint().enableDebugPointForAllFEs('FE.mow.commit.exception', null)
+ streamLoad {
+ table "${tableName}"
+
+ set 'column_separator', ','
+ set 'columns', 'id, name, score'
+ file "test_stream_load.csv"
+
+ time 10000 // limit inflight 10s
+
+ check { result, exception, startTime, endTime ->
+ log.info("Stream load result: ${result}")
+ def json = parseJson(result)
+ assertEquals("fail", json.Status.toLowerCase())
+ assertTrue(json.Message.contains("FE.mow.commit.exception"))
+ }
+ }
+ qt_sql """ select * from ${tableName} order by id"""
+
+ // this streamLoad will success because of removing exception injection
+ GetDebugPoint().disableDebugPointForAllFEs('FE.mow.commit.exception')
+ streamLoad {
+ table "${tableName}"
+
+ set 'column_separator', ','
+ set 'columns', 'id, name, score'
+ file "test_stream_load.csv"
+
+ time 10000 // limit inflight 10s
+
+ check { result, exception, startTime, endTime ->
+ log.info("Stream load result: ${result}")
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ }
+ }
+ qt_sql """ select * from ${tableName} order by id"""
+ } finally {
+ reset_be_param("mow_stream_load_commit_retry_times")
+ GetDebugPoint().disableDebugPointForAllFEs('FE.mow.commit.exception')
+ sql "DROP TABLE IF EXISTS ${tableName};"
+ GetDebugPoint().clearDebugPointsForAllFEs()
+ }
+
+ }
+}