From 2eca43c8d059b236df7e7f8b4c59744d3dd1fef1 Mon Sep 17 00:00:00 2001 From: zhangdong Date: Mon, 25 Nov 2024 19:35:12 +0800 Subject: [PATCH 1/4] [enhance](catalog)External partition prune return partitionName instead of partitionId (#44415) The partition ID of external data sources is meaningless, and some data sources only have partition names, so the return result of partition pruning is replaced with name instead of ID --- .../doris/datasource/hive/HMSExternalTable.java | 9 +++++++-- .../rules/OneListPartitionEvaluator.java | 14 +++++++------- .../expression/rules/OnePartitionEvaluator.java | 4 ++-- .../rules/OneRangePartitionEvaluator.java | 14 +++++++------- .../rules/expression/rules/PartitionPruner.java | 16 ++++++++-------- .../rules/UnknownPartitionEvaluator.java | 12 ++++++------ .../rules/rewrite/PruneFileScanPartition.java | 14 +++++++------- .../trees/plans/logical/LogicalFileScan.java | 6 +++--- 8 files changed, 47 insertions(+), 42 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java index aacd9268ae35cf..12c1f5f7cb6524 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java @@ -307,8 +307,13 @@ public SelectedPartitions getAllPartitions() { HiveMetaStoreCache.HivePartitionValues hivePartitionValues = cache.getPartitionValues( this.getDbName(), this.getName(), partitionColumnTypes); Map idToPartitionItem = hivePartitionValues.getIdToPartitionItem(); - - return new SelectedPartitions(idToPartitionItem.size(), idToPartitionItem, false); + // transfer id to name + BiMap idToName = hivePartitionValues.getPartitionNameToIdMap().inverse(); + Map nameToPartitionItem = Maps.newHashMapWithExpectedSize(idToPartitionItem.size()); + for (Entry entry : idToPartitionItem.entrySet()) { + nameToPartitionItem.put(idToName.get(entry.getKey()), entry.getValue()); + } + return new SelectedPartitions(idToPartitionItem.size(), nameToPartitionItem, false); } public boolean isHiveTransactionalTable() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/OneListPartitionEvaluator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/OneListPartitionEvaluator.java index b9bdf520e3d6d4..ecf8a26724113f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/OneListPartitionEvaluator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/OneListPartitionEvaluator.java @@ -35,16 +35,16 @@ import java.util.stream.IntStream; /** OneListPartitionInputs */ -public class OneListPartitionEvaluator - extends DefaultExpressionRewriter> implements OnePartitionEvaluator { - private final long partitionId; +public class OneListPartitionEvaluator + extends DefaultExpressionRewriter> implements OnePartitionEvaluator { + private final K partitionIdent; private final List partitionSlots; private final ListPartitionItem partitionItem; private final ExpressionRewriteContext expressionRewriteContext; - public OneListPartitionEvaluator(long partitionId, List partitionSlots, + public OneListPartitionEvaluator(K partitionIdent, List partitionSlots, ListPartitionItem partitionItem, CascadesContext cascadesContext) { - this.partitionId = partitionId; + this.partitionIdent = partitionIdent; this.partitionSlots = Objects.requireNonNull(partitionSlots, "partitionSlots cannot be null"); this.partitionItem = Objects.requireNonNull(partitionItem, "partitionItem cannot be null"); this.expressionRewriteContext = new ExpressionRewriteContext( @@ -52,8 +52,8 @@ public OneListPartitionEvaluator(long partitionId, List partitionSlots, } @Override - public long getPartitionId() { - return partitionId; + public K getPartitionIdent() { + return partitionIdent; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/OnePartitionEvaluator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/OnePartitionEvaluator.java index c51252b44a624d..8810a04750f792 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/OnePartitionEvaluator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/OnePartitionEvaluator.java @@ -25,8 +25,8 @@ import java.util.Map; /** the evaluator of the partition which represent one partition */ -public interface OnePartitionEvaluator { - long getPartitionId(); +public interface OnePartitionEvaluator { + K getPartitionIdent(); /** * return a slot to expression mapping to replace the input. diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/OneRangePartitionEvaluator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/OneRangePartitionEvaluator.java index 84a037171f32c5..1fb8954ab16547 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/OneRangePartitionEvaluator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/OneRangePartitionEvaluator.java @@ -80,10 +80,10 @@ * * you can see the process steps in the comment of PartitionSlotInput.columnRanges */ -public class OneRangePartitionEvaluator +public class OneRangePartitionEvaluator extends ExpressionVisitor - implements OnePartitionEvaluator { - private final long partitionId; + implements OnePartitionEvaluator { + private final K partitionIdent; private final List partitionSlots; private final RangePartitionItem partitionItem; private final ExpressionRewriteContext expressionRewriteContext; @@ -95,9 +95,9 @@ public class OneRangePartitionEvaluator private final Map slotToType; /** OneRangePartitionEvaluator */ - public OneRangePartitionEvaluator(long partitionId, List partitionSlots, + public OneRangePartitionEvaluator(K partitionIdent, List partitionSlots, RangePartitionItem partitionItem, CascadesContext cascadesContext, int expandThreshold) { - this.partitionId = partitionId; + this.partitionIdent = partitionIdent; this.partitionSlots = Objects.requireNonNull(partitionSlots, "partitionSlots cannot be null"); this.partitionItem = Objects.requireNonNull(partitionItem, "partitionItem cannot be null"); this.expressionRewriteContext = new ExpressionRewriteContext( @@ -155,8 +155,8 @@ public OneRangePartitionEvaluator(long partitionId, List partitionSlots, } @Override - public long getPartitionId() { - return partitionId; + public K getPartitionIdent() { + return partitionIdent; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/PartitionPruner.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/PartitionPruner.java index efe12f38cd74e4..fac1a7f82d2cfb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/PartitionPruner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/PartitionPruner.java @@ -102,21 +102,21 @@ public Expression visitComparisonPredicate(ComparisonPredicate cp, Void context) } /** prune */ - public List prune() { - Builder scanPartitionIds = ImmutableList.builder(); + public List prune() { + Builder scanPartitionIdents = ImmutableList.builder(); for (OnePartitionEvaluator partition : partitions) { if (!canBePrunedOut(partition)) { - scanPartitionIds.add(partition.getPartitionId()); + scanPartitionIdents.add((K) partition.getPartitionIdent()); } } - return scanPartitionIds.build(); + return scanPartitionIdents.build(); } /** * prune partition with `idToPartitions` as parameter. */ - public static List prune(List partitionSlots, Expression partitionPredicate, - Map idToPartitions, CascadesContext cascadesContext, + public static List prune(List partitionSlots, Expression partitionPredicate, + Map idToPartitions, CascadesContext cascadesContext, PartitionTableType partitionTableType) { partitionPredicate = PartitionPruneExpressionExtractor.extract( partitionPredicate, ImmutableSet.copyOf(partitionSlots), cascadesContext); @@ -135,7 +135,7 @@ public static List prune(List partitionSlots, Expression partitionPr } List evaluators = Lists.newArrayListWithCapacity(idToPartitions.size()); - for (Entry kv : idToPartitions.entrySet()) { + for (Entry kv : idToPartitions.entrySet()) { evaluators.add(toPartitionEvaluator( kv.getKey(), kv.getValue(), partitionSlots, cascadesContext, expandThreshold)); } @@ -147,7 +147,7 @@ public static List prune(List partitionSlots, Expression partitionPr /** * convert partition item to partition evaluator */ - public static final OnePartitionEvaluator toPartitionEvaluator(long id, PartitionItem partitionItem, + public static final OnePartitionEvaluator toPartitionEvaluator(K id, PartitionItem partitionItem, List partitionSlots, CascadesContext cascadesContext, int expandThreshold) { if (partitionItem instanceof ListPartitionItem) { return new OneListPartitionEvaluator( diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/UnknownPartitionEvaluator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/UnknownPartitionEvaluator.java index ae313ca09de269..394182a1311484 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/UnknownPartitionEvaluator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/UnknownPartitionEvaluator.java @@ -28,18 +28,18 @@ import java.util.Map; /** UnknownPartitionEvaluator */ -public class UnknownPartitionEvaluator implements OnePartitionEvaluator { - private final long partitionId; +public class UnknownPartitionEvaluator implements OnePartitionEvaluator { + private final K partitionIdent; private final PartitionItem partitionItem; - public UnknownPartitionEvaluator(long partitionId, PartitionItem partitionItem) { - this.partitionId = partitionId; + public UnknownPartitionEvaluator(K partitionId, PartitionItem partitionItem) { + this.partitionIdent = partitionId; this.partitionItem = partitionItem; } @Override - public long getPartitionId() { - return partitionId; + public K getPartitionIdent() { + return partitionIdent; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java index 2de4efab2ff6ed..9bec6570822240 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java @@ -76,7 +76,7 @@ public Rule build() { private SelectedPartitions pruneHivePartitions(HMSExternalTable hiveTbl, LogicalFilter filter, LogicalFileScan scan, CascadesContext ctx) { - Map selectedPartitionItems = Maps.newHashMap(); + Map selectedPartitionItems = Maps.newHashMap(); if (CollectionUtils.isEmpty(hiveTbl.getPartitionColumns())) { // non partitioned table, return NOT_PRUNED. // non partition table will be handled in HiveScanNode. @@ -91,13 +91,13 @@ private SelectedPartitions pruneHivePartitions(HMSExternalTable hiveTbl, .map(column -> scanOutput.get(column.getName().toLowerCase())) .collect(Collectors.toList()); - Map idToPartitionItem = scan.getSelectedPartitions().selectedPartitions; - List prunedPartitions = new ArrayList<>(PartitionPruner.prune( - partitionSlots, filter.getPredicate(), idToPartitionItem, ctx, PartitionTableType.HIVE)); + Map nameToPartitionItem = scan.getSelectedPartitions().selectedPartitions; + List prunedPartitions = new ArrayList<>(PartitionPruner.prune( + partitionSlots, filter.getPredicate(), nameToPartitionItem, ctx, PartitionTableType.HIVE)); - for (Long id : prunedPartitions) { - selectedPartitionItems.put(id, idToPartitionItem.get(id)); + for (String name : prunedPartitions) { + selectedPartitionItems.put(name, nameToPartitionItem.get(name)); } - return new SelectedPartitions(idToPartitionItem.size(), selectedPartitionItems, true); + return new SelectedPartitions(nameToPartitionItem.size(), selectedPartitionItems, true); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java index 0a2c69b68c1d33..ab4ef8efa3c5df 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java @@ -147,9 +147,9 @@ public static class SelectedPartitions { */ public final long totalPartitionNum; /** - * partition id -> partition item + * partition name -> partition item */ - public final Map selectedPartitions; + public final Map selectedPartitions; /** * true means the result is after partition pruning * false means the partition pruning is not processed. @@ -159,7 +159,7 @@ public static class SelectedPartitions { /** * Constructor for SelectedPartitions. */ - public SelectedPartitions(long totalPartitionNum, Map selectedPartitions, + public SelectedPartitions(long totalPartitionNum, Map selectedPartitions, boolean isPruned) { this.totalPartitionNum = totalPartitionNum; this.selectedPartitions = ImmutableMap.copyOf(Objects.requireNonNull(selectedPartitions, From efbd3890b1485b21fbe1a0efc8a535511217ce41 Mon Sep 17 00:00:00 2001 From: zhangdong Date: Tue, 26 Nov 2024 11:51:28 +0800 Subject: [PATCH 2/4] [enhance](catalog)Unified external partition prune interface (#44567) ### What problem does this PR solve? Previously, external partition cropping only supported Hive. If you want to support other types of tables, you need to understand the internal processing logic of partition pruning. This PR abstracts the logic of partition pruning, and other tables can be implemented by simply covering a few methods of externalTable ### Release note [opt](planner) Unified external partition prune interface --- .../doris/datasource/ExternalTable.java | 53 +++++++++++++++++++ .../datasource/hive/HMSExternalTable.java | 23 ++++++-- .../nereids/rules/analysis/BindRelation.java | 1 - .../rules/rewrite/PruneFileScanPartition.java | 18 +++---- .../trees/plans/logical/LogicalFileScan.java | 11 ++-- 5 files changed, 82 insertions(+), 24 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java index 1eadb46fe82eed..3aee5550acf646 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java @@ -20,6 +20,7 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.PartitionItem; import org.apache.doris.catalog.TableAttributes; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.TableIndexes; @@ -30,6 +31,7 @@ import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.PropertyAnalyzer; import org.apache.doris.common.util.Util; +import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions; import org.apache.doris.persist.gson.GsonPostProcessable; import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.statistics.AnalysisInfo; @@ -41,6 +43,7 @@ import com.google.common.collect.Sets; import com.google.gson.annotations.SerializedName; import lombok.Getter; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.NotImplementedException; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -48,9 +51,11 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.OptionalLong; import java.util.Set; /** @@ -364,4 +369,52 @@ protected Optional getSchemaCacheValue() { public TableIndexes getTableIndexes() { return new TableIndexes(); } + + /** + * Retrieve all partitions and initialize SelectedPartitions + * + * @param snapshotId if not support mvcc, ignore this + * @return + */ + public SelectedPartitions initSelectedPartitions(OptionalLong snapshotId) { + if (!supportPartitionPruned()) { + return SelectedPartitions.NOT_PRUNED; + } + if (CollectionUtils.isEmpty(this.getPartitionColumns(snapshotId))) { + return SelectedPartitions.NOT_PRUNED; + } + Map nameToPartitionItems = getNameToPartitionItems(snapshotId); + return new SelectedPartitions(nameToPartitionItems.size(), nameToPartitionItems, false); + } + + /** + * get partition map + * If partition related operations are supported, this method needs to be implemented in the subclass + * + * @param snapshotId if not support mvcc, ignore this + * @return partitionName ==> PartitionItem + */ + public Map getNameToPartitionItems(OptionalLong snapshotId) { + return Collections.emptyMap(); + } + + /** + * get partition column list + * If partition related operations are supported, this method needs to be implemented in the subclass + * + * @param snapshotId if not support mvcc, ignore this + * @return + */ + public List getPartitionColumns(OptionalLong snapshotId) { + return Collections.emptyList(); + } + + /** + * Does it support partition cpruned, If so, this method needs to be overridden in subclasses + * + * @return + */ + public boolean supportPartitionPruned() { + return false; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java index 12c1f5f7cb6524..13012a03f55b73 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java @@ -40,7 +40,6 @@ import org.apache.doris.mtmv.MTMVSnapshotIf; import org.apache.doris.mtmv.MTMVTimestampSnapshot; import org.apache.doris.nereids.exceptions.NotSupportedException; -import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions; import org.apache.doris.qe.GlobalVariable; import org.apache.doris.statistics.AnalysisInfo; import org.apache.doris.statistics.BaseAnalysisTask; @@ -296,11 +295,25 @@ public List getPartitionColumns() { .orElse(Collections.emptyList()); } - public SelectedPartitions getAllPartitions() { + @Override + public List getPartitionColumns(OptionalLong snapshotId) { + return getPartitionColumns(); + } + + @Override + public boolean supportPartitionPruned() { + return getDlaType() == DLAType.HIVE; + } + + @Override + public Map getNameToPartitionItems(OptionalLong snapshotId) { + return getNameToPartitionItems(); + } + + public Map getNameToPartitionItems() { if (CollectionUtils.isEmpty(this.getPartitionColumns())) { - return SelectedPartitions.NOT_PRUNED; + return Collections.emptyMap(); } - HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr() .getMetaStoreCache((HMSExternalCatalog) this.getCatalog()); List partitionColumnTypes = this.getPartitionColumnTypes(); @@ -313,7 +326,7 @@ public SelectedPartitions getAllPartitions() { for (Entry entry : idToPartitionItem.entrySet()) { nameToPartitionItem.put(idToName.get(entry.getKey()), entry.getValue()); } - return new SelectedPartitions(idToPartitionItem.size(), nameToPartitionItem, false); + return nameToPartitionItem; } public boolean isHiveTransactionalTable() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java index c8d0a6f5dd4757..2bc9fb4c729d08 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java @@ -435,7 +435,6 @@ private LogicalPlan getLogicalPlan(TableIf table, UnboundRelation unboundRelatio } else { return new LogicalFileScan(unboundRelation.getRelationId(), (HMSExternalTable) table, qualifierWithoutTableName, - ((HMSExternalTable) table).getAllPartitions(), unboundRelation.getTableSample(), unboundRelation.getTableSnapshot()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java index 9bec6570822240..d50219383072df 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java @@ -19,8 +19,6 @@ import org.apache.doris.catalog.PartitionItem; import org.apache.doris.datasource.ExternalTable; -import org.apache.doris.datasource.hive.HMSExternalTable; -import org.apache.doris.datasource.hive.HMSExternalTable.DLAType; import org.apache.doris.nereids.CascadesContext; import org.apache.doris.nereids.rules.Rule; import org.apache.doris.nereids.rules.RuleType; @@ -38,6 +36,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.OptionalLong; import java.util.function.Function; import java.util.stream.Collectors; @@ -60,10 +59,8 @@ public Rule build() { ExternalTable tbl = scan.getTable(); SelectedPartitions selectedPartitions; - // TODO(cmy): support other external table - if (tbl instanceof HMSExternalTable && ((HMSExternalTable) tbl).getDlaType() == DLAType.HIVE) { - HMSExternalTable hiveTbl = (HMSExternalTable) tbl; - selectedPartitions = pruneHivePartitions(hiveTbl, filter, scan, ctx.cascadesContext); + if (tbl.supportPartitionPruned()) { + selectedPartitions = pruneExternalPartitions(tbl, filter, scan, ctx.cascadesContext); } else { // set isPruned so that it won't go pass the partition prune again selectedPartitions = new SelectedPartitions(0, ImmutableMap.of(), true); @@ -74,10 +71,11 @@ public Rule build() { }).toRule(RuleType.FILE_SCAN_PARTITION_PRUNE); } - private SelectedPartitions pruneHivePartitions(HMSExternalTable hiveTbl, + private SelectedPartitions pruneExternalPartitions(ExternalTable externalTable, LogicalFilter filter, LogicalFileScan scan, CascadesContext ctx) { Map selectedPartitionItems = Maps.newHashMap(); - if (CollectionUtils.isEmpty(hiveTbl.getPartitionColumns())) { + // todo: real snapshotId + if (CollectionUtils.isEmpty(externalTable.getPartitionColumns(OptionalLong.empty()))) { // non partitioned table, return NOT_PRUNED. // non partition table will be handled in HiveScanNode. return SelectedPartitions.NOT_PRUNED; @@ -85,8 +83,8 @@ private SelectedPartitions pruneHivePartitions(HMSExternalTable hiveTbl, Map scanOutput = scan.getOutput() .stream() .collect(Collectors.toMap(slot -> slot.getName().toLowerCase(), Function.identity())); - - List partitionSlots = hiveTbl.getPartitionColumns() + // todo: real snapshotId + List partitionSlots = externalTable.getPartitionColumns(OptionalLong.empty()) .stream() .map(column -> scanOutput.get(column.getName().toLowerCase())) .collect(Collectors.toList()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java index ab4ef8efa3c5df..010c30d915d529 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java @@ -36,6 +36,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.OptionalLong; /** * Logical file scan for external catalog. @@ -59,17 +60,11 @@ protected LogicalFileScan(RelationId id, ExternalTable table, List quali this.tableSnapshot = tableSnapshot; } - public LogicalFileScan(RelationId id, ExternalTable table, List qualifier, - SelectedPartitions selectedPartitions, - Optional tableSample, Optional tableSnapshot) { - this(id, table, qualifier, Optional.empty(), Optional.empty(), - selectedPartitions, tableSample, tableSnapshot); - } - public LogicalFileScan(RelationId id, ExternalTable table, List qualifier, Optional tableSample, Optional tableSnapshot) { + // todo: real snapshotId this(id, table, qualifier, Optional.empty(), Optional.empty(), - SelectedPartitions.NOT_PRUNED, tableSample, tableSnapshot); + table.initSelectedPartitions(OptionalLong.empty()), tableSample, tableSnapshot); } public SelectedPartitions getSelectedPartitions() { From a616855deec8b3368efd85921370846230a1a5b3 Mon Sep 17 00:00:00 2001 From: zhangdong Date: Thu, 28 Nov 2024 10:22:20 +0800 Subject: [PATCH 3/4] [feat](mtmv)Unified external table interface supporting partition refresh and partition pruning (#44673) - Add `MvccTable` to represent a table that supports querying specified version data - Add the `MvccSnapshot` interface to store snapshot information of mvcc at a certain moment in time - Add the `MvccSnapshot` parameter to the method of the `MTMVRelatedTableIf `interface to retrieve data of a specified version - Partition pruning related methods combined with the `MvccSnapshot` parameter are used to obtain partition information for a specified version - Load the snapshot information of mvccTable at the beginning of the query plan and store it in StatementContext Unified external table interface supporting partition refresh and partition pruning --- .../org/apache/doris/catalog/OlapTable.java | 23 ++++- .../doris/datasource/ExternalTable.java | 18 ++-- .../datasource/hive/HMSExternalTable.java | 35 ++++---- .../doris/datasource/mvcc/MvccSnapshot.java | 25 ++++++ .../doris/datasource/mvcc/MvccTable.java | 33 ++++++++ .../doris/datasource/mvcc/MvccTableInfo.java | 84 +++++++++++++++++++ .../paimon/PaimonExternalTable.java | 1 + .../mtmv/MTMVPartitionExprDateTrunc.java | 2 +- .../apache/doris/mtmv/MTMVPartitionInfo.java | 3 +- .../apache/doris/mtmv/MTMVPartitionUtil.java | 11 +-- ...MTMVRelatedPartitionDescInitGenerator.java | 3 +- ...MVRelatedPartitionDescRollUpGenerator.java | 3 +- .../apache/doris/mtmv/MTMVRelatedTableIf.java | 22 +++-- .../apache/doris/nereids/CascadesContext.java | 8 ++ .../apache/doris/nereids/NereidsPlanner.java | 2 +- .../doris/nereids/StatementContext.java | 31 +++++++ .../exploration/mv/MaterializedViewUtils.java | 4 +- .../rules/rewrite/PruneFileScanPartition.java | 6 +- .../plans/commands/info/CreateMTMVInfo.java | 2 +- .../info/MTMVPartitionDefinition.java | 3 +- .../trees/plans/logical/LogicalFileScan.java | 3 +- .../doris/mtmv/MTMVPartitionUtilTest.java | 5 +- 22 files changed, 271 insertions(+), 56 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccSnapshot.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccTable.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccTableInfo.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index 731a60e9a49f86..96ebd89b6cc980 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -50,6 +50,7 @@ import org.apache.doris.common.util.PropertyAnalyzer; import org.apache.doris.common.util.Util; import org.apache.doris.datasource.InternalCatalog; +import org.apache.doris.datasource.mvcc.MvccSnapshot; import org.apache.doris.mtmv.MTMVRefreshContext; import org.apache.doris.mtmv.MTMVRelatedTableIf; import org.apache.doris.mtmv.MTMVSnapshotIf; @@ -953,6 +954,10 @@ public PartitionInfo getPartitionInfo() { } @Override + public Set getPartitionColumnNames(Optional snapshot) throws DdlException { + return getPartitionColumnNames(); + } + public Set getPartitionColumnNames() throws DdlException { Set partitionColumnNames = Sets.newHashSet(); if (partitionInfo instanceof SinglePartitionInfo) { @@ -3117,11 +3122,20 @@ public long getVisibleVersionTime() { } @Override + public PartitionType getPartitionType(Optional snapshot) { + return getPartitionType(); + } + public PartitionType getPartitionType() { return partitionInfo.getType(); } @Override + public Map getAndCopyPartitionItems(Optional snapshot) + throws AnalysisException { + return getAndCopyPartitionItems(); + } + public Map getAndCopyPartitionItems() throws AnalysisException { if (!tryReadLock(1, TimeUnit.MINUTES)) { throw new AnalysisException("get table read lock timeout, database=" + getDBName() + ",table=" + getName()); @@ -3141,12 +3155,17 @@ public Map getAndCopyPartitionItems() throws AnalysisExce } @Override + public List getPartitionColumns(Optional snapshot) { + return getPartitionColumns(); + } + public List getPartitionColumns() { return getPartitionInfo().getPartitionColumns(); } @Override - public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context) + public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context, + Optional snapshot) throws AnalysisException { Map partitionVersions = context.getBaseVersions().getPartitionVersions(); long partitionId = getPartitionOrAnalysisException(partitionName).getId(); @@ -3156,7 +3175,7 @@ public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshCont } @Override - public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context) { + public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, Optional snapshot) { Map tableVersions = context.getBaseVersions().getTableVersions(); long visibleVersion = tableVersions.containsKey(id) ? tableVersions.get(id) : getVisibleVersion(); return new MTMVVersionSnapshot(visibleVersion, id); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java index 3aee5550acf646..d82959954f2607 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java @@ -31,6 +31,7 @@ import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.PropertyAnalyzer; import org.apache.doris.common.util.Util; +import org.apache.doris.datasource.mvcc.MvccSnapshot; import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions; import org.apache.doris.persist.gson.GsonPostProcessable; import org.apache.doris.persist.gson.GsonUtils; @@ -55,7 +56,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.OptionalLong; import java.util.Set; /** @@ -373,17 +373,17 @@ public TableIndexes getTableIndexes() { /** * Retrieve all partitions and initialize SelectedPartitions * - * @param snapshotId if not support mvcc, ignore this + * @param snapshot if not support mvcc, ignore this * @return */ - public SelectedPartitions initSelectedPartitions(OptionalLong snapshotId) { + public SelectedPartitions initSelectedPartitions(Optional snapshot) { if (!supportPartitionPruned()) { return SelectedPartitions.NOT_PRUNED; } - if (CollectionUtils.isEmpty(this.getPartitionColumns(snapshotId))) { + if (CollectionUtils.isEmpty(this.getPartitionColumns(snapshot))) { return SelectedPartitions.NOT_PRUNED; } - Map nameToPartitionItems = getNameToPartitionItems(snapshotId); + Map nameToPartitionItems = getNameToPartitionItems(snapshot); return new SelectedPartitions(nameToPartitionItems.size(), nameToPartitionItems, false); } @@ -391,10 +391,10 @@ public SelectedPartitions initSelectedPartitions(OptionalLong snapshotId) { * get partition map * If partition related operations are supported, this method needs to be implemented in the subclass * - * @param snapshotId if not support mvcc, ignore this + * @param snapshot if not support mvcc, ignore this * @return partitionName ==> PartitionItem */ - public Map getNameToPartitionItems(OptionalLong snapshotId) { + public Map getNameToPartitionItems(Optional snapshot) { return Collections.emptyMap(); } @@ -402,10 +402,10 @@ public Map getNameToPartitionItems(OptionalLong snapshotI * get partition column list * If partition related operations are supported, this method needs to be implemented in the subclass * - * @param snapshotId if not support mvcc, ignore this + * @param snapshot if not support mvcc, ignore this * @return */ - public List getPartitionColumns(OptionalLong snapshotId) { + public List getPartitionColumns(Optional snapshot) { return Collections.emptyList(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java index 13012a03f55b73..6d65f8bcdbccb7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java @@ -33,6 +33,7 @@ import org.apache.doris.datasource.SchemaCacheValue; import org.apache.doris.datasource.hudi.HudiUtils; import org.apache.doris.datasource.iceberg.IcebergUtils; +import org.apache.doris.datasource.mvcc.MvccSnapshot; import org.apache.doris.mtmv.MTMVBaseTableIf; import org.apache.doris.mtmv.MTMVMaxTimestampSnapshot; import org.apache.doris.mtmv.MTMVRefreshContext; @@ -287,7 +288,6 @@ public List getPartitionColumnTypes() { .orElse(Collections.emptyList()); } - @Override public List getPartitionColumns() { makeSureInitialized(); Optional schemaCacheValue = getSchemaCacheValue(); @@ -296,7 +296,7 @@ public List getPartitionColumns() { } @Override - public List getPartitionColumns(OptionalLong snapshotId) { + public List getPartitionColumns(Optional snapshot) { return getPartitionColumns(); } @@ -306,7 +306,7 @@ public boolean supportPartitionPruned() { } @Override - public Map getNameToPartitionItems(OptionalLong snapshotId) { + public Map getNameToPartitionItems(Optional snapshot) { return getNameToPartitionItems(); } @@ -755,34 +755,32 @@ public Set getDistributionColumnNames() { } @Override + public PartitionType getPartitionType(Optional snapshot) { + return getPartitionType(); + } + public PartitionType getPartitionType() { return getPartitionColumns().size() > 0 ? PartitionType.LIST : PartitionType.UNPARTITIONED; } @Override + public Set getPartitionColumnNames(Optional snapshot) { + return getPartitionColumnNames(); + } + public Set getPartitionColumnNames() { return getPartitionColumns().stream() .map(c -> c.getName().toLowerCase()).collect(Collectors.toSet()); } @Override - public Map getAndCopyPartitionItems() { - HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr() - .getMetaStoreCache((HMSExternalCatalog) getCatalog()); - HiveMetaStoreCache.HivePartitionValues hivePartitionValues = cache.getPartitionValues( - getDbName(), getName(), getPartitionColumnTypes()); - Map res = Maps.newHashMap(); - Map idToPartitionItem = hivePartitionValues.getIdToPartitionItem(); - BiMap idToName = hivePartitionValues.getPartitionNameToIdMap().inverse(); - for (Entry entry : idToPartitionItem.entrySet()) { - res.put(idToName.get(entry.getKey()), entry.getValue()); - } - return res; + public Map getAndCopyPartitionItems(Optional snapshot) { + return getNameToPartitionItems(); } @Override - public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context) - throws AnalysisException { + public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context, + Optional snapshot) throws AnalysisException { HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr() .getMetaStoreCache((HMSExternalCatalog) getCatalog()); HiveMetaStoreCache.HivePartitionValues hivePartitionValues = cache.getPartitionValues( @@ -794,7 +792,8 @@ public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshCont } @Override - public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context) throws AnalysisException { + public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, Optional snapshot) + throws AnalysisException { if (getPartitionType() == PartitionType.UNPARTITIONED) { return new MTMVMaxTimestampSnapshot(getName(), getLastDdlTime()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccSnapshot.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccSnapshot.java new file mode 100644 index 00000000000000..d7826b0a5de19e --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccSnapshot.java @@ -0,0 +1,25 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.mvcc; + +/** + * The snapshot information of mvcc is defined by each table, + * but it should be ensured that the table information queried through this snapshot remains unchanged + */ +public interface MvccSnapshot { +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccTable.java new file mode 100644 index 00000000000000..d69e0f3114df0c --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccTable.java @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.mvcc; + +import org.apache.doris.catalog.TableIf; + +/** + * The table that needs to query data based on the version needs to implement this interface. + */ +public interface MvccTable extends TableIf { + /** + * Retrieve the current snapshot information of the table, + * and the returned result will be used for the entire process of this query + * + * @return MvccSnapshot + */ + MvccSnapshot loadSnapshot(); +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccTableInfo.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccTableInfo.java new file mode 100644 index 00000000000000..0d865f837c8c4e --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccTableInfo.java @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.mvcc; + +import org.apache.doris.catalog.DatabaseIf; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.datasource.CatalogIf; + +import com.google.common.base.Objects; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class MvccTableInfo { + private static final Logger LOG = LogManager.getLogger(MvccTableInfo.class); + + private String tableName; + private String dbName; + private String ctlName; + + public MvccTableInfo(TableIf table) { + java.util.Objects.requireNonNull(table, "table is null"); + DatabaseIf database = table.getDatabase(); + java.util.Objects.requireNonNull(database, "database is null"); + CatalogIf catalog = database.getCatalog(); + java.util.Objects.requireNonNull(database, "catalog is null"); + this.tableName = table.getName(); + this.dbName = database.getFullName(); + this.ctlName = catalog.getName(); + } + + public String getTableName() { + return tableName; + } + + public String getDbName() { + return dbName; + } + + public String getCtlName() { + return ctlName; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + MvccTableInfo that = (MvccTableInfo) o; + return Objects.equal(tableName, that.tableName) && Objects.equal( + dbName, that.dbName) && Objects.equal(ctlName, that.ctlName); + } + + @Override + public int hashCode() { + return Objects.hashCode(tableName, dbName, ctlName); + } + + @Override + public String toString() { + return "MvccTableInfo{" + + "tableName='" + tableName + '\'' + + ", dbName='" + dbName + '\'' + + ", ctlName='" + ctlName + '\'' + + '}'; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java index c9eaf1b7df32ef..224f8144ccef61 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java @@ -46,6 +46,7 @@ import java.util.HashMap; import java.util.List; import java.util.Optional; +import java.util.Set; import java.util.stream.Collectors; public class PaimonExternalTable extends ExternalTable { diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionExprDateTrunc.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionExprDateTrunc.java index ea15c84d1b925d..95a8717e01c4c7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionExprDateTrunc.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionExprDateTrunc.java @@ -69,7 +69,7 @@ public void analyze(MTMVPartitionInfo mvPartitionInfo) throws AnalysisException String.format("timeUnit not support: %s, only support: %s", this.timeUnit, timeUnits)); } MTMVRelatedTableIf relatedTable = mvPartitionInfo.getRelatedTable(); - PartitionType partitionType = relatedTable.getPartitionType(); + PartitionType partitionType = relatedTable.getPartitionType(Optional.empty()); if (partitionType == PartitionType.RANGE) { Type partitionColumnType = MTMVPartitionUtil .getPartitionColumnType(mvPartitionInfo.getRelatedTable(), mvPartitionInfo.getRelatedCol()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionInfo.java index b3cd239269abc7..7eae44db0af4cc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionInfo.java @@ -25,6 +25,7 @@ import com.google.gson.annotations.SerializedName; import java.util.List; +import java.util.Optional; /** * MTMVPartitionInfo @@ -115,7 +116,7 @@ public int getRelatedColPos() throws AnalysisException { if (partitionType == MTMVPartitionType.SELF_MANAGE) { throw new AnalysisException("partitionType is: " + partitionType); } - List partitionColumns = getRelatedTable().getPartitionColumns(); + List partitionColumns = getRelatedTable().getPartitionColumns(Optional.empty()); for (int i = 0; i < partitionColumns.size(); i++) { if (partitionColumns.get(i).getName().equalsIgnoreCase(relatedCol)) { return i; diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java index 1cfb5e021a5309..8ba022de415006 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java @@ -50,6 +50,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -329,7 +330,7 @@ public static boolean isSyncWithPartitions(MTMVRefreshContext context, String mt } for (String relatedPartitionName : relatedPartitionNames) { MTMVSnapshotIf relatedPartitionCurrentSnapshot = relatedTable - .getPartitionSnapshot(relatedPartitionName, context); + .getPartitionSnapshot(relatedPartitionName, context, Optional.empty()); if (!mtmv.getRefreshSnapshot() .equalsWithRelatedPartition(mtmvPartitionName, relatedPartitionName, relatedPartitionCurrentSnapshot)) { @@ -446,7 +447,7 @@ private static boolean isSyncWithBaseTable(MTMVRefreshContext context, String mt if (!baseTable.needAutoRefresh()) { return true; } - MTMVSnapshotIf baseTableCurrentSnapshot = baseTable.getTableSnapshot(context); + MTMVSnapshotIf baseTableCurrentSnapshot = baseTable.getTableSnapshot(context, Optional.empty()); return mtmv.getRefreshSnapshot() .equalsWithBaseTable(mtmvPartitionName, new BaseTableInfo(baseTable), baseTableCurrentSnapshot); } @@ -482,7 +483,7 @@ private static MTMVRefreshPartitionSnapshot generatePartitionSnapshot(MTMVRefres MTMVRelatedTableIf relatedTable = mtmv.getMvPartitionInfo().getRelatedTable(); for (String relatedPartitionName : relatedPartitionNames) { MTMVSnapshotIf partitionSnapshot = relatedTable - .getPartitionSnapshot(relatedPartitionName, context); + .getPartitionSnapshot(relatedPartitionName, context, Optional.empty()); refreshPartitionSnapshot.getPartitions() .put(relatedPartitionName, partitionSnapshot); } @@ -497,13 +498,13 @@ private static MTMVRefreshPartitionSnapshot generatePartitionSnapshot(MTMVRefres continue; } refreshPartitionSnapshot.addTableSnapshot(baseTableInfo, - ((MTMVRelatedTableIf) table).getTableSnapshot(context)); + ((MTMVRelatedTableIf) table).getTableSnapshot(context, Optional.empty())); } return refreshPartitionSnapshot; } public static Type getPartitionColumnType(MTMVRelatedTableIf relatedTable, String col) throws AnalysisException { - List partitionColumns = relatedTable.getPartitionColumns(); + List partitionColumns = relatedTable.getPartitionColumns(Optional.empty()); for (Column column : partitionColumns) { if (column.getName().equals(col)) { return column.getType(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescInitGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescInitGenerator.java index 13b58239376116..c6b4e331184e2a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescInitGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescInitGenerator.java @@ -20,6 +20,7 @@ import org.apache.doris.common.AnalysisException; import java.util.Map; +import java.util.Optional; /** * get all related partition descs @@ -29,6 +30,6 @@ public class MTMVRelatedPartitionDescInitGenerator implements MTMVRelatedPartiti @Override public void apply(MTMVPartitionInfo mvPartitionInfo, Map mvProperties, RelatedPartitionDescResult lastResult) throws AnalysisException { - lastResult.setItems(mvPartitionInfo.getRelatedTable().getAndCopyPartitionItems()); + lastResult.setItems(mvPartitionInfo.getRelatedTable().getAndCopyPartitionItems(Optional.empty())); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescRollUpGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescRollUpGenerator.java index 76e20ef70f5d92..325fab819d9a09 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescRollUpGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescRollUpGenerator.java @@ -31,6 +31,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; import java.util.Set; /** @@ -45,7 +46,7 @@ public void apply(MTMVPartitionInfo mvPartitionInfo, Map mvPrope return; } MTMVRelatedTableIf relatedTable = mvPartitionInfo.getRelatedTable(); - PartitionType partitionType = relatedTable.getPartitionType(); + PartitionType partitionType = relatedTable.getPartitionType(Optional.empty()); if (partitionType == PartitionType.RANGE) { lastResult.setDescs(rollUpRange(lastResult.getDescs(), mvPartitionInfo)); } else if (partitionType == PartitionType.LIST) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java index 4a8b14603ce4d6..c4261aa78f10be 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java @@ -23,9 +23,11 @@ import org.apache.doris.catalog.TableIf; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; +import org.apache.doris.datasource.mvcc.MvccSnapshot; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; /** @@ -38,31 +40,35 @@ public interface MTMVRelatedTableIf extends TableIf { * Note: This method is called every time there is a refresh and transparent rewrite, * so if this method is slow, it will significantly reduce query performance * + * @param snapshot * @return partitionName->PartitionItem */ - Map getAndCopyPartitionItems() throws AnalysisException; + Map getAndCopyPartitionItems(Optional snapshot) throws AnalysisException; /** * getPartitionType LIST/RANGE/UNPARTITIONED * + * @param snapshot * @return */ - PartitionType getPartitionType(); + PartitionType getPartitionType(Optional snapshot); /** * getPartitionColumnNames * + * @param snapshot * @return * @throws DdlException */ - Set getPartitionColumnNames() throws DdlException; + Set getPartitionColumnNames(Optional snapshot) throws DdlException; /** * getPartitionColumns * + * @param snapshot * @return */ - List getPartitionColumns(); + List getPartitionColumns(Optional snapshot); /** * getPartitionSnapshot @@ -70,12 +76,14 @@ public interface MTMVRelatedTableIf extends TableIf { * If snapshots have already been obtained in bulk in the context, * the results should be obtained directly from the context * + * @param snapshot * @param partitionName * @param context * @return partition snapshot at current time * @throws AnalysisException */ - MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context) throws AnalysisException; + MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context, + Optional snapshot) throws AnalysisException; /** * getTableSnapshot @@ -83,11 +91,13 @@ public interface MTMVRelatedTableIf extends TableIf { * If snapshots have already been obtained in bulk in the context, * the results should be obtained directly from the context * + * @param snapshot * @param context * @return table snapshot at current time * @throws AnalysisException */ - MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context) throws AnalysisException; + MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, Optional snapshot) + throws AnalysisException; /** * Does the current type of table allow timed triggering diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java index a5c966370f030d..17ae5883063fb7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java @@ -70,6 +70,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Maps; import com.google.common.collect.Multimap; +import org.apache.commons.collections.MapUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -462,6 +463,13 @@ private Set> getTables(LogicalPlan logicalPlan) { return tableNames; } + public Map, TableIf> getOrExtractTables(LogicalPlan logicalPlan) { + if (MapUtils.isEmpty(tables)) { + extractTables(logicalPlan); + } + return tables; + } + private Set> extractTableNamesFromHaving(LogicalHaving having) { Set subqueryExprs = having.getPredicate() .collect(SubqueryExpr.class::isInstance); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java index e6ee5482d49a94..d365ff912de334 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java @@ -216,7 +216,7 @@ public Plan planWithLock(LogicalPlan plan, PhysicalProperties requireProperties, plan = preprocess(plan); initCascadesContext(plan, requireProperties); - + statementContext.loadSnapshots(cascadesContext.getOrExtractTables(plan)); try (Lock lock = new Lock(plan, cascadesContext)) { Plan resultPlan = planWithoutLock(plan, explainLevel, showPlanProcess, requireProperties); lockCallback.accept(resultPlan); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java index ed64864da5079f..ce897adf79c3b9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java @@ -24,6 +24,9 @@ import org.apache.doris.common.Id; import org.apache.doris.common.IdGenerator; import org.apache.doris.common.Pair; +import org.apache.doris.datasource.mvcc.MvccSnapshot; +import org.apache.doris.datasource.mvcc.MvccTable; +import org.apache.doris.datasource.mvcc.MvccTableInfo; import org.apache.doris.nereids.hint.Hint; import org.apache.doris.nereids.memo.Group; import org.apache.doris.nereids.rules.analysis.ColumnAliasGenerator; @@ -171,6 +174,8 @@ public class StatementContext implements Closeable { private String disableJoinReorderReason; + private final Map snapshots = Maps.newHashMap(); + public StatementContext() { this(ConnectContext.get(), null, 0); } @@ -505,6 +510,32 @@ public void addPlannerHook(PlannerHook plannerHook) { this.plannerHooks.add(plannerHook); } + /** + * Load snapshot information of mvcc + * + * @param tables Tables used in queries + */ + public void loadSnapshots(Map, TableIf> tables) { + if (tables == null) { + return; + } + for (TableIf tableIf : tables.values()) { + if (tableIf instanceof MvccTable) { + snapshots.put(new MvccTableInfo(tableIf), ((MvccTable) tableIf).loadSnapshot()); + } + } + } + + /** + * Obtain snapshot information of mvcc + * + * @param mvccTable mvccTable + * @return MvccSnapshot + */ + public MvccSnapshot getSnapshot(MvccTable mvccTable) { + return snapshots.get(new MvccTableInfo(mvccTable)); + } + private static class CloseableResource implements Closeable { public final String resourceName; public final String threadName; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtils.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtils.java index a659c2f9990a3f..484abd11f01e72 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtils.java @@ -481,13 +481,13 @@ public Void visitLogicalRelation(LogicalRelation relation, IncrementCheckerConte return null; } MTMVRelatedTableIf relatedTable = (MTMVRelatedTableIf) table; - PartitionType type = relatedTable.getPartitionType(); + PartitionType type = relatedTable.getPartitionType(Optional.empty()); if (PartitionType.UNPARTITIONED.equals(type)) { context.addFailReason(String.format("related base table is not partition table, the table is %s", table.getName())); return null; } - Set partitionColumnSet = new HashSet<>(relatedTable.getPartitionColumns()); + Set partitionColumnSet = new HashSet<>(relatedTable.getPartitionColumns(Optional.empty())); Column mvReferenceColumn = contextPartitionColumn.getColumn().get(); Expr definExpr = mvReferenceColumn.getDefineExpr(); if (definExpr instanceof SlotRef) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java index d50219383072df..4bbb0a8aa76270 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java @@ -36,7 +36,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.OptionalLong; +import java.util.Optional; import java.util.function.Function; import java.util.stream.Collectors; @@ -75,7 +75,7 @@ private SelectedPartitions pruneExternalPartitions(ExternalTable externalTable, LogicalFilter filter, LogicalFileScan scan, CascadesContext ctx) { Map selectedPartitionItems = Maps.newHashMap(); // todo: real snapshotId - if (CollectionUtils.isEmpty(externalTable.getPartitionColumns(OptionalLong.empty()))) { + if (CollectionUtils.isEmpty(externalTable.getPartitionColumns(Optional.empty()))) { // non partitioned table, return NOT_PRUNED. // non partition table will be handled in HiveScanNode. return SelectedPartitions.NOT_PRUNED; @@ -84,7 +84,7 @@ private SelectedPartitions pruneExternalPartitions(ExternalTable externalTable, .stream() .collect(Collectors.toMap(slot -> slot.getName().toLowerCase(), Function.identity())); // todo: real snapshotId - List partitionSlots = externalTable.getPartitionColumns(OptionalLong.empty()) + List partitionSlots = externalTable.getPartitionColumns(Optional.empty()) .stream() .map(column -> scanOutput.get(column.getName().toLowerCase())) .collect(Collectors.toList()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateMTMVInfo.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateMTMVInfo.java index de5e188d5a65bf..8c44b42a5ccfbd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateMTMVInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateMTMVInfo.java @@ -351,7 +351,7 @@ private PartitionDesc generatePartitionDesc(ConnectContext ctx) { allPartitionDescs.size(), ctx.getSessionVariable().getCreateTablePartitionMaxNum())); } try { - PartitionType type = relatedTable.getPartitionType(); + PartitionType type = relatedTable.getPartitionType(Optional.empty()); if (type == PartitionType.RANGE) { return new RangePartitionDesc(Lists.newArrayList(mvPartitionInfo.getPartitionCol()), allPartitionDescs); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/MTMVPartitionDefinition.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/MTMVPartitionDefinition.java index 427e2368e7ab2b..c4117e8608e29d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/MTMVPartitionDefinition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/MTMVPartitionDefinition.java @@ -54,6 +54,7 @@ import com.google.common.collect.Sets; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -147,7 +148,7 @@ private RelatedTableInfo getRelatedTableInfo(NereidsPlanner planner, ConnectCont MTMVRelatedTableIf mtmvBaseRealtedTable = MTMVUtil.getRelatedTable(relatedTableInfo.getTableInfo()); Set partitionColumnNames = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER); try { - partitionColumnNames.addAll(mtmvBaseRealtedTable.getPartitionColumnNames()); + partitionColumnNames.addAll(mtmvBaseRealtedTable.getPartitionColumnNames(Optional.empty())); } catch (DdlException e) { throw new AnalysisException(e.getMessage(), e); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java index 010c30d915d529..96b8e032d11274 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java @@ -36,7 +36,6 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.OptionalLong; /** * Logical file scan for external catalog. @@ -64,7 +63,7 @@ public LogicalFileScan(RelationId id, ExternalTable table, List qualifie Optional tableSample, Optional tableSnapshot) { // todo: real snapshotId this(id, table, qualifier, Optional.empty(), Optional.empty(), - table.initSelectedPartitions(OptionalLong.empty()), tableSample, tableSnapshot); + table.initSelectedPartitions(Optional.empty()), tableSample, tableSnapshot); } public SelectedPartitions getSelectedPartitions() { diff --git a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java index 997385742dc09a..e5d2e21a8db626 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java @@ -35,6 +35,7 @@ import org.junit.Test; import java.util.List; +import java.util.Optional; import java.util.Set; public class MTMVPartitionUtilTest { @@ -112,7 +113,7 @@ public void setUp() throws NoSuchMethodException, SecurityException, AnalysisExc minTimes = 0; result = true; - baseOlapTable.getTableSnapshot((MTMVRefreshContext) any); + baseOlapTable.getTableSnapshot((MTMVRefreshContext) any, (Optional) any); minTimes = 0; result = baseSnapshotIf; @@ -132,7 +133,7 @@ public void setUp() throws NoSuchMethodException, SecurityException, AnalysisExc minTimes = 0; result = true; - baseOlapTable.getPartitionSnapshot(anyString, (MTMVRefreshContext) any); + baseOlapTable.getPartitionSnapshot(anyString, (MTMVRefreshContext) any, (Optional) any); minTimes = 0; result = baseSnapshotIf; From 4685c6e16498c054a9930f0996d249dd48722f5a Mon Sep 17 00:00:00 2001 From: zhangdong Date: Mon, 2 Dec 2024 12:30:55 +0800 Subject: [PATCH 4/4] 1 --- .../org/apache/doris/datasource/paimon/PaimonExternalTable.java | 1 - 1 file changed, 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java index 224f8144ccef61..c9eaf1b7df32ef 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java @@ -46,7 +46,6 @@ import java.util.HashMap; import java.util.List; import java.util.Optional; -import java.util.Set; import java.util.stream.Collectors; public class PaimonExternalTable extends ExternalTable {