diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneEmptyPartition.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneEmptyPartition.java index c7b8f452afbe66..3347518b1652a8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneEmptyPartition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneEmptyPartition.java @@ -18,11 +18,16 @@ package org.apache.doris.nereids.rules.rewrite; import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Partition; import org.apache.doris.nereids.rules.Rule; import org.apache.doris.nereids.rules.RuleType; import org.apache.doris.nereids.trees.plans.logical.LogicalEmptyRelation; import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.transaction.TransactionEntry; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.util.List; @@ -30,6 +35,7 @@ * Used to prune empty partition. */ public class PruneEmptyPartition extends OneRewriteRuleFactory { + public static final Logger LOG = LogManager.getLogger(PruneEmptyPartition.class); @Override public Rule build() { @@ -38,6 +44,11 @@ public Rule build() { OlapTable table = scan.getTable(); List partitionIdsToPrune = scan.getSelectedPartitionIds(); List ids = table.selectNonEmptyPartitionIds(partitionIdsToPrune); + if (ctx.connectContext != null && ctx.connectContext.isTxnModel()) { + // In transaction load, need to add empty partitions which have invisible data of sub transactions + selectNonEmptyPartitionIdsForTxnLoad(ctx.connectContext.getTxnEntry(), table, scan.getSelectedIndexId(), + partitionIdsToPrune, ids); + } if (ids.isEmpty()) { return new LogicalEmptyRelation(ConnectContext.get().getStatementContext().getNextRelationId(), scan.getOutput()); @@ -49,4 +60,24 @@ public Rule build() { return scan.withSelectedPartitionIds(ids); }).toRule(RuleType.PRUNE_EMPTY_PARTITION); } + + private void selectNonEmptyPartitionIdsForTxnLoad(TransactionEntry txnEntry, OlapTable table, long indexId, + List selectedPartitions, List nonEmptyPartitionIds) { + for (Long selectedPartitionId : selectedPartitions) { + if (nonEmptyPartitionIds.contains(selectedPartitionId)) { + continue; + } + Partition partition = table.getPartition(selectedPartitionId); + if (partition == null) { + continue; + } + if (!txnEntry.getPartitionSubTxnIds(table.getId(), partition, indexId).isEmpty()) { + nonEmptyPartitionIds.add(selectedPartitionId); + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("add partition for txn load, table: {}, selected partitions: {}, non empty partitions: {}", + table.getId(), selectedPartitions, nonEmptyPartitionIds); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java index 25c4ff4b3b2493..70c281201b6f66 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java @@ -22,7 +22,9 @@ import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.KeysType; +import org.apache.doris.catalog.MaterializedIndex; import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.Table; import org.apache.doris.catalog.TableIf; import org.apache.doris.cloud.transaction.CloudGlobalTransactionMgr; @@ -501,6 +503,30 @@ public void setTxnLoadInfoInObserver(TTxnLoadInfo txnLoadInfo) throws DdlExcepti + "subTxnStates={}", label, transactionId, dbId, timeoutTimestamp, allSubTxnNum, subTransactionStates); } + public List getPartitionSubTxnIds(long tableId, Partition partition, long indexId) { + List subTxnIds = new ArrayList<>(); + MaterializedIndex index = partition.getIndex(indexId); + if (index == null) { + LOG.error("index={} not found in table={}, partition={}", indexId, tableId, partition.getId()); + return subTxnIds; + } + for (SubTransactionState subTransactionState : subTransactionStates) { + if (subTransactionState.getTable().getId() != tableId) { + continue; + } + for (TTabletCommitInfo tabletCommitInfo : subTransactionState.getTabletCommitInfos()) { + if (index.getTablet(tabletCommitInfo.getTabletId()) != null) { + subTxnIds.add(subTransactionState.getSubTransactionId()); + break; + } + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("table_id={}, partition_id={}, sub_txn_ids={}", tableId, partition.getId(), subTxnIds); + } + return subTxnIds; + } + private void resetByTxnInfo(TTxnLoadInfo txnLoadInfo) throws DdlException { if (txnLoadInfo.isSetDbId()) { this.dbId = txnLoadInfo.getDbId(); diff --git a/regression-test/suites/insert_p0/transaction/txn_insert.groovy b/regression-test/suites/insert_p0/transaction/txn_insert.groovy index 41b8a6d35dd909..319a20fdaba27d 100644 --- a/regression-test/suites/insert_p0/transaction/txn_insert.groovy +++ b/regression-test/suites/insert_p0/transaction/txn_insert.groovy @@ -793,6 +793,28 @@ suite("txn_insert") { order_qt_select_cu2 """select * from ${unique_table}_2""" order_qt_select_cu3 """select * from ${unique_table}_3""" } + + // 19. delete from empty table + sql """ drop table if exists txn_insert_dt6; """ + sql """ + CREATE TABLE `txn_insert_dt6` ( + `ID` int NOT NULL, + `NAME` varchar(100) NULL, + `SCORE` int NULL + ) ENGINE=OLAP + DUPLICATE KEY(`ID`) + DISTRIBUTED BY HASH(`ID`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + sql """ begin; """ + sql """ INSERT INTO txn_insert_dt6 select 1, 'Alice', 100; """ + test { + sql """ delete from txn_insert_dt6 where id = 1; """ + exception """Can not delete because there is a insert operation for the same table""" + } + sql """ rollback; """ } def db_name = "regression_test_insert_p0_transaction"