From 6d57f7d7d8091b1cd205aae2464448f65f3c0d7a Mon Sep 17 00:00:00 2001 From: zhangdong Date: Mon, 25 Nov 2024 19:35:12 +0800 Subject: [PATCH 1/4] [enhance](catalog)External partition prune return partitionName instead of partitionId (#44415) The partition ID of external data sources is meaningless, and some data sources only have partition names, so the return result of partition pruning is replaced with name instead of ID --- .../doris/datasource/hive/HMSExternalTable.java | 9 +++++++-- .../rules/OneListPartitionEvaluator.java | 14 +++++++------- .../expression/rules/OnePartitionEvaluator.java | 4 ++-- .../rules/OneRangePartitionEvaluator.java | 14 +++++++------- .../rules/expression/rules/PartitionPruner.java | 16 ++++++++-------- .../rules/UnknownPartitionEvaluator.java | 12 ++++++------ .../rules/rewrite/PruneFileScanPartition.java | 14 +++++++------- .../trees/plans/logical/LogicalFileScan.java | 6 +++--- 8 files changed, 47 insertions(+), 42 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java index 5df44fda4769fe..0ff020c130b992 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java @@ -307,8 +307,13 @@ public SelectedPartitions getAllPartitions() { HiveMetaStoreCache.HivePartitionValues hivePartitionValues = cache.getPartitionValues( this.getDbName(), this.getName(), partitionColumnTypes); Map idToPartitionItem = hivePartitionValues.getIdToPartitionItem(); - - return new SelectedPartitions(idToPartitionItem.size(), idToPartitionItem, false); + // transfer id to name + BiMap idToName = hivePartitionValues.getPartitionNameToIdMap().inverse(); + Map nameToPartitionItem = Maps.newHashMapWithExpectedSize(idToPartitionItem.size()); + for (Entry entry : idToPartitionItem.entrySet()) { + nameToPartitionItem.put(idToName.get(entry.getKey()), entry.getValue()); + } + return new SelectedPartitions(idToPartitionItem.size(), nameToPartitionItem, false); } public boolean isHiveTransactionalTable() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/OneListPartitionEvaluator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/OneListPartitionEvaluator.java index b9bdf520e3d6d4..ecf8a26724113f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/OneListPartitionEvaluator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/OneListPartitionEvaluator.java @@ -35,16 +35,16 @@ import java.util.stream.IntStream; /** OneListPartitionInputs */ -public class OneListPartitionEvaluator - extends DefaultExpressionRewriter> implements OnePartitionEvaluator { - private final long partitionId; +public class OneListPartitionEvaluator + extends DefaultExpressionRewriter> implements OnePartitionEvaluator { + private final K partitionIdent; private final List partitionSlots; private final ListPartitionItem partitionItem; private final ExpressionRewriteContext expressionRewriteContext; - public OneListPartitionEvaluator(long partitionId, List partitionSlots, + public OneListPartitionEvaluator(K partitionIdent, List partitionSlots, ListPartitionItem partitionItem, CascadesContext cascadesContext) { - this.partitionId = partitionId; + this.partitionIdent = partitionIdent; this.partitionSlots = Objects.requireNonNull(partitionSlots, "partitionSlots cannot be null"); this.partitionItem = Objects.requireNonNull(partitionItem, "partitionItem cannot be null"); this.expressionRewriteContext = new ExpressionRewriteContext( @@ -52,8 +52,8 @@ public OneListPartitionEvaluator(long partitionId, List partitionSlots, } @Override - public long getPartitionId() { - return partitionId; + public K getPartitionIdent() { + return partitionIdent; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/OnePartitionEvaluator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/OnePartitionEvaluator.java index c51252b44a624d..8810a04750f792 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/OnePartitionEvaluator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/OnePartitionEvaluator.java @@ -25,8 +25,8 @@ import java.util.Map; /** the evaluator of the partition which represent one partition */ -public interface OnePartitionEvaluator { - long getPartitionId(); +public interface OnePartitionEvaluator { + K getPartitionIdent(); /** * return a slot to expression mapping to replace the input. diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/OneRangePartitionEvaluator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/OneRangePartitionEvaluator.java index 84a037171f32c5..1fb8954ab16547 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/OneRangePartitionEvaluator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/OneRangePartitionEvaluator.java @@ -80,10 +80,10 @@ * * you can see the process steps in the comment of PartitionSlotInput.columnRanges */ -public class OneRangePartitionEvaluator +public class OneRangePartitionEvaluator extends ExpressionVisitor - implements OnePartitionEvaluator { - private final long partitionId; + implements OnePartitionEvaluator { + private final K partitionIdent; private final List partitionSlots; private final RangePartitionItem partitionItem; private final ExpressionRewriteContext expressionRewriteContext; @@ -95,9 +95,9 @@ public class OneRangePartitionEvaluator private final Map slotToType; /** OneRangePartitionEvaluator */ - public OneRangePartitionEvaluator(long partitionId, List partitionSlots, + public OneRangePartitionEvaluator(K partitionIdent, List partitionSlots, RangePartitionItem partitionItem, CascadesContext cascadesContext, int expandThreshold) { - this.partitionId = partitionId; + this.partitionIdent = partitionIdent; this.partitionSlots = Objects.requireNonNull(partitionSlots, "partitionSlots cannot be null"); this.partitionItem = Objects.requireNonNull(partitionItem, "partitionItem cannot be null"); this.expressionRewriteContext = new ExpressionRewriteContext( @@ -155,8 +155,8 @@ public OneRangePartitionEvaluator(long partitionId, List partitionSlots, } @Override - public long getPartitionId() { - return partitionId; + public K getPartitionIdent() { + return partitionIdent; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/PartitionPruner.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/PartitionPruner.java index efe12f38cd74e4..fac1a7f82d2cfb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/PartitionPruner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/PartitionPruner.java @@ -102,21 +102,21 @@ public Expression visitComparisonPredicate(ComparisonPredicate cp, Void context) } /** prune */ - public List prune() { - Builder scanPartitionIds = ImmutableList.builder(); + public List prune() { + Builder scanPartitionIdents = ImmutableList.builder(); for (OnePartitionEvaluator partition : partitions) { if (!canBePrunedOut(partition)) { - scanPartitionIds.add(partition.getPartitionId()); + scanPartitionIdents.add((K) partition.getPartitionIdent()); } } - return scanPartitionIds.build(); + return scanPartitionIdents.build(); } /** * prune partition with `idToPartitions` as parameter. */ - public static List prune(List partitionSlots, Expression partitionPredicate, - Map idToPartitions, CascadesContext cascadesContext, + public static List prune(List partitionSlots, Expression partitionPredicate, + Map idToPartitions, CascadesContext cascadesContext, PartitionTableType partitionTableType) { partitionPredicate = PartitionPruneExpressionExtractor.extract( partitionPredicate, ImmutableSet.copyOf(partitionSlots), cascadesContext); @@ -135,7 +135,7 @@ public static List prune(List partitionSlots, Expression partitionPr } List evaluators = Lists.newArrayListWithCapacity(idToPartitions.size()); - for (Entry kv : idToPartitions.entrySet()) { + for (Entry kv : idToPartitions.entrySet()) { evaluators.add(toPartitionEvaluator( kv.getKey(), kv.getValue(), partitionSlots, cascadesContext, expandThreshold)); } @@ -147,7 +147,7 @@ public static List prune(List partitionSlots, Expression partitionPr /** * convert partition item to partition evaluator */ - public static final OnePartitionEvaluator toPartitionEvaluator(long id, PartitionItem partitionItem, + public static final OnePartitionEvaluator toPartitionEvaluator(K id, PartitionItem partitionItem, List partitionSlots, CascadesContext cascadesContext, int expandThreshold) { if (partitionItem instanceof ListPartitionItem) { return new OneListPartitionEvaluator( diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/UnknownPartitionEvaluator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/UnknownPartitionEvaluator.java index ae313ca09de269..394182a1311484 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/UnknownPartitionEvaluator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/UnknownPartitionEvaluator.java @@ -28,18 +28,18 @@ import java.util.Map; /** UnknownPartitionEvaluator */ -public class UnknownPartitionEvaluator implements OnePartitionEvaluator { - private final long partitionId; +public class UnknownPartitionEvaluator implements OnePartitionEvaluator { + private final K partitionIdent; private final PartitionItem partitionItem; - public UnknownPartitionEvaluator(long partitionId, PartitionItem partitionItem) { - this.partitionId = partitionId; + public UnknownPartitionEvaluator(K partitionId, PartitionItem partitionItem) { + this.partitionIdent = partitionId; this.partitionItem = partitionItem; } @Override - public long getPartitionId() { - return partitionId; + public K getPartitionIdent() { + return partitionIdent; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java index 2de4efab2ff6ed..9bec6570822240 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java @@ -76,7 +76,7 @@ public Rule build() { private SelectedPartitions pruneHivePartitions(HMSExternalTable hiveTbl, LogicalFilter filter, LogicalFileScan scan, CascadesContext ctx) { - Map selectedPartitionItems = Maps.newHashMap(); + Map selectedPartitionItems = Maps.newHashMap(); if (CollectionUtils.isEmpty(hiveTbl.getPartitionColumns())) { // non partitioned table, return NOT_PRUNED. // non partition table will be handled in HiveScanNode. @@ -91,13 +91,13 @@ private SelectedPartitions pruneHivePartitions(HMSExternalTable hiveTbl, .map(column -> scanOutput.get(column.getName().toLowerCase())) .collect(Collectors.toList()); - Map idToPartitionItem = scan.getSelectedPartitions().selectedPartitions; - List prunedPartitions = new ArrayList<>(PartitionPruner.prune( - partitionSlots, filter.getPredicate(), idToPartitionItem, ctx, PartitionTableType.HIVE)); + Map nameToPartitionItem = scan.getSelectedPartitions().selectedPartitions; + List prunedPartitions = new ArrayList<>(PartitionPruner.prune( + partitionSlots, filter.getPredicate(), nameToPartitionItem, ctx, PartitionTableType.HIVE)); - for (Long id : prunedPartitions) { - selectedPartitionItems.put(id, idToPartitionItem.get(id)); + for (String name : prunedPartitions) { + selectedPartitionItems.put(name, nameToPartitionItem.get(name)); } - return new SelectedPartitions(idToPartitionItem.size(), selectedPartitionItems, true); + return new SelectedPartitions(nameToPartitionItem.size(), selectedPartitionItems, true); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java index 0a2c69b68c1d33..ab4ef8efa3c5df 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java @@ -147,9 +147,9 @@ public static class SelectedPartitions { */ public final long totalPartitionNum; /** - * partition id -> partition item + * partition name -> partition item */ - public final Map selectedPartitions; + public final Map selectedPartitions; /** * true means the result is after partition pruning * false means the partition pruning is not processed. @@ -159,7 +159,7 @@ public static class SelectedPartitions { /** * Constructor for SelectedPartitions. */ - public SelectedPartitions(long totalPartitionNum, Map selectedPartitions, + public SelectedPartitions(long totalPartitionNum, Map selectedPartitions, boolean isPruned) { this.totalPartitionNum = totalPartitionNum; this.selectedPartitions = ImmutableMap.copyOf(Objects.requireNonNull(selectedPartitions, From c75ec0f0faacbef4ec27f8a2b55c1b94849c2c42 Mon Sep 17 00:00:00 2001 From: zhangdong Date: Tue, 26 Nov 2024 11:51:28 +0800 Subject: [PATCH 2/4] [enhance](catalog)Unified external partition prune interface (#44567) Previously, external partition cropping only supported Hive. If you want to support other types of tables, you need to understand the internal processing logic of partition pruning. This PR abstracts the logic of partition pruning, and other tables can be implemented by simply covering a few methods of externalTable [opt](planner) Unified external partition prune interface --- .../doris/datasource/ExternalTable.java | 53 +++++++++++++++++++ .../datasource/hive/HMSExternalTable.java | 23 ++++++-- .../nereids/rules/analysis/BindRelation.java | 1 - .../rules/rewrite/PruneFileScanPartition.java | 18 +++---- .../trees/plans/logical/LogicalFileScan.java | 11 ++-- 5 files changed, 82 insertions(+), 24 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java index 590a4cbe04625c..5c60384d331a1f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java @@ -20,6 +20,7 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.PartitionItem; import org.apache.doris.catalog.TableAttributes; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.constraint.Constraint; @@ -28,6 +29,7 @@ import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.Util; +import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions; import org.apache.doris.persist.gson.GsonPostProcessable; import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.statistics.AnalysisInfo; @@ -40,6 +42,7 @@ import com.google.common.collect.Lists; import com.google.gson.annotations.SerializedName; import lombok.Getter; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.NotImplementedException; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -47,9 +50,11 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.OptionalLong; import java.util.Set; import java.util.stream.Collectors; @@ -371,4 +376,52 @@ protected Optional getSchemaCacheValue() { ExternalSchemaCache cache = Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(catalog); return cache.getSchemaValue(dbName, name); } + + /** + * Retrieve all partitions and initialize SelectedPartitions + * + * @param snapshotId if not support mvcc, ignore this + * @return + */ + public SelectedPartitions initSelectedPartitions(OptionalLong snapshotId) { + if (!supportPartitionPruned()) { + return SelectedPartitions.NOT_PRUNED; + } + if (CollectionUtils.isEmpty(this.getPartitionColumns(snapshotId))) { + return SelectedPartitions.NOT_PRUNED; + } + Map nameToPartitionItems = getNameToPartitionItems(snapshotId); + return new SelectedPartitions(nameToPartitionItems.size(), nameToPartitionItems, false); + } + + /** + * get partition map + * If partition related operations are supported, this method needs to be implemented in the subclass + * + * @param snapshotId if not support mvcc, ignore this + * @return partitionName ==> PartitionItem + */ + public Map getNameToPartitionItems(OptionalLong snapshotId) { + return Collections.emptyMap(); + } + + /** + * get partition column list + * If partition related operations are supported, this method needs to be implemented in the subclass + * + * @param snapshotId if not support mvcc, ignore this + * @return + */ + public List getPartitionColumns(OptionalLong snapshotId) { + return Collections.emptyList(); + } + + /** + * Does it support partition cpruned, If so, this method needs to be overridden in subclasses + * + * @return + */ + public boolean supportPartitionPruned() { + return false; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java index 0ff020c130b992..567ac7e5f3c509 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java @@ -40,7 +40,6 @@ import org.apache.doris.mtmv.MTMVSnapshotIf; import org.apache.doris.mtmv.MTMVTimestampSnapshot; import org.apache.doris.nereids.exceptions.NotSupportedException; -import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions; import org.apache.doris.qe.GlobalVariable; import org.apache.doris.statistics.AnalysisInfo; import org.apache.doris.statistics.BaseAnalysisTask; @@ -296,11 +295,25 @@ public List getPartitionColumns() { .orElse(Collections.emptyList()); } - public SelectedPartitions getAllPartitions() { + @Override + public List getPartitionColumns(OptionalLong snapshotId) { + return getPartitionColumns(); + } + + @Override + public boolean supportPartitionPruned() { + return getDlaType() == DLAType.HIVE; + } + + @Override + public Map getNameToPartitionItems(OptionalLong snapshotId) { + return getNameToPartitionItems(); + } + + public Map getNameToPartitionItems() { if (CollectionUtils.isEmpty(this.getPartitionColumns())) { - return SelectedPartitions.NOT_PRUNED; + return Collections.emptyMap(); } - HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr() .getMetaStoreCache((HMSExternalCatalog) this.getCatalog()); List partitionColumnTypes = this.getPartitionColumnTypes(); @@ -313,7 +326,7 @@ public SelectedPartitions getAllPartitions() { for (Entry entry : idToPartitionItem.entrySet()) { nameToPartitionItem.put(idToName.get(entry.getKey()), entry.getValue()); } - return new SelectedPartitions(idToPartitionItem.size(), nameToPartitionItem, false); + return nameToPartitionItem; } public boolean isHiveTransactionalTable() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java index d06647ee5153b1..e773d2721a9d6e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java @@ -429,7 +429,6 @@ private LogicalPlan getLogicalPlan(TableIf table, UnboundRelation unboundRelatio } else { return new LogicalFileScan(unboundRelation.getRelationId(), (HMSExternalTable) table, qualifierWithoutTableName, - ((HMSExternalTable) table).getAllPartitions(), unboundRelation.getTableSample(), unboundRelation.getTableSnapshot()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java index 9bec6570822240..d50219383072df 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java @@ -19,8 +19,6 @@ import org.apache.doris.catalog.PartitionItem; import org.apache.doris.datasource.ExternalTable; -import org.apache.doris.datasource.hive.HMSExternalTable; -import org.apache.doris.datasource.hive.HMSExternalTable.DLAType; import org.apache.doris.nereids.CascadesContext; import org.apache.doris.nereids.rules.Rule; import org.apache.doris.nereids.rules.RuleType; @@ -38,6 +36,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.OptionalLong; import java.util.function.Function; import java.util.stream.Collectors; @@ -60,10 +59,8 @@ public Rule build() { ExternalTable tbl = scan.getTable(); SelectedPartitions selectedPartitions; - // TODO(cmy): support other external table - if (tbl instanceof HMSExternalTable && ((HMSExternalTable) tbl).getDlaType() == DLAType.HIVE) { - HMSExternalTable hiveTbl = (HMSExternalTable) tbl; - selectedPartitions = pruneHivePartitions(hiveTbl, filter, scan, ctx.cascadesContext); + if (tbl.supportPartitionPruned()) { + selectedPartitions = pruneExternalPartitions(tbl, filter, scan, ctx.cascadesContext); } else { // set isPruned so that it won't go pass the partition prune again selectedPartitions = new SelectedPartitions(0, ImmutableMap.of(), true); @@ -74,10 +71,11 @@ public Rule build() { }).toRule(RuleType.FILE_SCAN_PARTITION_PRUNE); } - private SelectedPartitions pruneHivePartitions(HMSExternalTable hiveTbl, + private SelectedPartitions pruneExternalPartitions(ExternalTable externalTable, LogicalFilter filter, LogicalFileScan scan, CascadesContext ctx) { Map selectedPartitionItems = Maps.newHashMap(); - if (CollectionUtils.isEmpty(hiveTbl.getPartitionColumns())) { + // todo: real snapshotId + if (CollectionUtils.isEmpty(externalTable.getPartitionColumns(OptionalLong.empty()))) { // non partitioned table, return NOT_PRUNED. // non partition table will be handled in HiveScanNode. return SelectedPartitions.NOT_PRUNED; @@ -85,8 +83,8 @@ private SelectedPartitions pruneHivePartitions(HMSExternalTable hiveTbl, Map scanOutput = scan.getOutput() .stream() .collect(Collectors.toMap(slot -> slot.getName().toLowerCase(), Function.identity())); - - List partitionSlots = hiveTbl.getPartitionColumns() + // todo: real snapshotId + List partitionSlots = externalTable.getPartitionColumns(OptionalLong.empty()) .stream() .map(column -> scanOutput.get(column.getName().toLowerCase())) .collect(Collectors.toList()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java index ab4ef8efa3c5df..010c30d915d529 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java @@ -36,6 +36,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.OptionalLong; /** * Logical file scan for external catalog. @@ -59,17 +60,11 @@ protected LogicalFileScan(RelationId id, ExternalTable table, List quali this.tableSnapshot = tableSnapshot; } - public LogicalFileScan(RelationId id, ExternalTable table, List qualifier, - SelectedPartitions selectedPartitions, - Optional tableSample, Optional tableSnapshot) { - this(id, table, qualifier, Optional.empty(), Optional.empty(), - selectedPartitions, tableSample, tableSnapshot); - } - public LogicalFileScan(RelationId id, ExternalTable table, List qualifier, Optional tableSample, Optional tableSnapshot) { + // todo: real snapshotId this(id, table, qualifier, Optional.empty(), Optional.empty(), - SelectedPartitions.NOT_PRUNED, tableSample, tableSnapshot); + table.initSelectedPartitions(OptionalLong.empty()), tableSample, tableSnapshot); } public SelectedPartitions getSelectedPartitions() { From 04e3579c9184e2e505fd67124c16630c7ba4d67b Mon Sep 17 00:00:00 2001 From: zhangdong Date: Thu, 28 Nov 2024 10:22:20 +0800 Subject: [PATCH 3/4] [feat](mtmv)Unified external table interface supporting partition refresh and partition pruning (#44673) - Add `MvccTable` to represent a table that supports querying specified version data - Add the `MvccSnapshot` interface to store snapshot information of mvcc at a certain moment in time - Add the `MvccSnapshot` parameter to the method of the `MTMVRelatedTableIf `interface to retrieve data of a specified version - Partition pruning related methods combined with the `MvccSnapshot` parameter are used to obtain partition information for a specified version - Load the snapshot information of mvccTable at the beginning of the query plan and store it in StatementContext Unified external table interface supporting partition refresh and partition pruning --- .../org/apache/doris/catalog/OlapTable.java | 23 ++++- .../doris/datasource/ExternalTable.java | 18 ++-- .../datasource/hive/HMSExternalTable.java | 36 ++++---- .../doris/datasource/mvcc/MvccSnapshot.java | 25 ++++++ .../doris/datasource/mvcc/MvccTable.java | 33 ++++++++ .../doris/datasource/mvcc/MvccTableInfo.java | 84 +++++++++++++++++++ .../paimon/PaimonExternalTable.java | 1 + .../mtmv/MTMVPartitionExprDateTrunc.java | 2 +- .../apache/doris/mtmv/MTMVPartitionInfo.java | 3 +- .../apache/doris/mtmv/MTMVPartitionUtil.java | 11 +-- ...MTMVRelatedPartitionDescInitGenerator.java | 3 +- ...MVRelatedPartitionDescRollUpGenerator.java | 3 +- .../apache/doris/mtmv/MTMVRelatedTableIf.java | 23 +++-- .../apache/doris/nereids/CascadesContext.java | 8 ++ .../apache/doris/nereids/NereidsPlanner.java | 2 +- .../doris/nereids/StatementContext.java | 31 +++++++ .../exploration/mv/MaterializedViewUtils.java | 4 +- .../rules/rewrite/PruneFileScanPartition.java | 6 +- .../plans/commands/info/CreateMTMVInfo.java | 2 +- .../info/MTMVPartitionDefinition.java | 3 +- .../trees/plans/logical/LogicalFileScan.java | 3 +- .../doris/mtmv/MTMVPartitionUtilTest.java | 5 +- 22 files changed, 272 insertions(+), 57 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccSnapshot.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccTable.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccTableInfo.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index f9404e21fa24e9..0c6cbc828e4272 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -47,6 +47,7 @@ import org.apache.doris.common.util.PropertyAnalyzer; import org.apache.doris.common.util.Util; import org.apache.doris.datasource.InternalCatalog; +import org.apache.doris.datasource.mvcc.MvccSnapshot; import org.apache.doris.mtmv.MTMVRefreshContext; import org.apache.doris.mtmv.MTMVRelatedTableIf; import org.apache.doris.mtmv.MTMVSnapshotIf; @@ -983,6 +984,10 @@ public PartitionInfo getPartitionInfo() { } @Override + public Set getPartitionColumnNames(Optional snapshot) throws DdlException { + return getPartitionColumnNames(); + } + public Set getPartitionColumnNames() throws DdlException { Set partitionColumnNames = Sets.newHashSet(); if (partitionInfo instanceof SinglePartitionInfo) { @@ -3001,11 +3006,20 @@ public long getVisibleVersionTime() { } @Override + public PartitionType getPartitionType(Optional snapshot) { + return getPartitionType(); + } + public PartitionType getPartitionType() { return partitionInfo.getType(); } @Override + public Map getAndCopyPartitionItems(Optional snapshot) + throws AnalysisException { + return getAndCopyPartitionItems(); + } + public Map getAndCopyPartitionItems() throws AnalysisException { if (!tryReadLock(1, TimeUnit.MINUTES)) { throw new AnalysisException( @@ -3026,12 +3040,17 @@ public Map getAndCopyPartitionItems() throws AnalysisExce } @Override + public List getPartitionColumns(Optional snapshot) { + return getPartitionColumns(); + } + public List getPartitionColumns() { return getPartitionInfo().getPartitionColumns(); } @Override - public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context) + public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context, + Optional snapshot) throws AnalysisException { Map partitionVersions = context.getBaseVersions().getPartitionVersions(); long partitionId = getPartitionOrAnalysisException(partitionName).getId(); @@ -3041,7 +3060,7 @@ public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshCont } @Override - public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context) { + public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, Optional snapshot) { Map tableVersions = context.getBaseVersions().getTableVersions(); long visibleVersion = tableVersions.containsKey(id) ? tableVersions.get(id) : getVisibleVersion(); return new MTMVVersionSnapshot(visibleVersion, id); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java index 5c60384d331a1f..041f7e35c16cdb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java @@ -29,6 +29,7 @@ import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.Util; +import org.apache.doris.datasource.mvcc.MvccSnapshot; import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions; import org.apache.doris.persist.gson.GsonPostProcessable; import org.apache.doris.persist.gson.GsonUtils; @@ -54,7 +55,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.OptionalLong; import java.util.Set; import java.util.stream.Collectors; @@ -380,17 +380,17 @@ protected Optional getSchemaCacheValue() { /** * Retrieve all partitions and initialize SelectedPartitions * - * @param snapshotId if not support mvcc, ignore this + * @param snapshot if not support mvcc, ignore this * @return */ - public SelectedPartitions initSelectedPartitions(OptionalLong snapshotId) { + public SelectedPartitions initSelectedPartitions(Optional snapshot) { if (!supportPartitionPruned()) { return SelectedPartitions.NOT_PRUNED; } - if (CollectionUtils.isEmpty(this.getPartitionColumns(snapshotId))) { + if (CollectionUtils.isEmpty(this.getPartitionColumns(snapshot))) { return SelectedPartitions.NOT_PRUNED; } - Map nameToPartitionItems = getNameToPartitionItems(snapshotId); + Map nameToPartitionItems = getNameToPartitionItems(snapshot); return new SelectedPartitions(nameToPartitionItems.size(), nameToPartitionItems, false); } @@ -398,10 +398,10 @@ public SelectedPartitions initSelectedPartitions(OptionalLong snapshotId) { * get partition map * If partition related operations are supported, this method needs to be implemented in the subclass * - * @param snapshotId if not support mvcc, ignore this + * @param snapshot if not support mvcc, ignore this * @return partitionName ==> PartitionItem */ - public Map getNameToPartitionItems(OptionalLong snapshotId) { + public Map getNameToPartitionItems(Optional snapshot) { return Collections.emptyMap(); } @@ -409,10 +409,10 @@ public Map getNameToPartitionItems(OptionalLong snapshotI * get partition column list * If partition related operations are supported, this method needs to be implemented in the subclass * - * @param snapshotId if not support mvcc, ignore this + * @param snapshot if not support mvcc, ignore this * @return */ - public List getPartitionColumns(OptionalLong snapshotId) { + public List getPartitionColumns(Optional snapshot) { return Collections.emptyList(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java index 567ac7e5f3c509..9254d68a4ac5b4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java @@ -33,6 +33,7 @@ import org.apache.doris.datasource.SchemaCacheValue; import org.apache.doris.datasource.hudi.HudiUtils; import org.apache.doris.datasource.iceberg.IcebergUtils; +import org.apache.doris.datasource.mvcc.MvccSnapshot; import org.apache.doris.mtmv.MTMVBaseTableIf; import org.apache.doris.mtmv.MTMVMaxTimestampSnapshot; import org.apache.doris.mtmv.MTMVRefreshContext; @@ -287,7 +288,6 @@ public List getPartitionColumnTypes() { .orElse(Collections.emptyList()); } - @Override public List getPartitionColumns() { makeSureInitialized(); Optional schemaCacheValue = getSchemaCacheValue(); @@ -296,7 +296,7 @@ public List getPartitionColumns() { } @Override - public List getPartitionColumns(OptionalLong snapshotId) { + public List getPartitionColumns(Optional snapshot) { return getPartitionColumns(); } @@ -306,7 +306,7 @@ public boolean supportPartitionPruned() { } @Override - public Map getNameToPartitionItems(OptionalLong snapshotId) { + public Map getNameToPartitionItems(Optional snapshot) { return getNameToPartitionItems(); } @@ -757,34 +757,33 @@ public Set getDistributionColumnNames() { } @Override + public PartitionType getPartitionType(Optional snapshot) { + return getPartitionType(); + } + public PartitionType getPartitionType() { return getPartitionColumns().size() > 0 ? PartitionType.LIST : PartitionType.UNPARTITIONED; } @Override + public Set getPartitionColumnNames(Optional snapshot) { + return getPartitionColumnNames(); + } + public Set getPartitionColumnNames() { return getPartitionColumns().stream() .map(c -> c.getName().toLowerCase()).collect(Collectors.toSet()); } @Override - public Map getAndCopyPartitionItems() { - HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr() - .getMetaStoreCache((HMSExternalCatalog) getCatalog()); - HiveMetaStoreCache.HivePartitionValues hivePartitionValues = cache.getPartitionValues( - getDbName(), getName(), getPartitionColumnTypes()); - Map res = Maps.newHashMap(); - Map idToPartitionItem = hivePartitionValues.getIdToPartitionItem(); - BiMap idToName = hivePartitionValues.getPartitionNameToIdMap().inverse(); - for (Entry entry : idToPartitionItem.entrySet()) { - res.put(idToName.get(entry.getKey()), entry.getValue()); - } - return res; + + public Map getAndCopyPartitionItems(Optional snapshot) { + return getNameToPartitionItems(); } @Override - public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context) - throws AnalysisException { + public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context, + Optional snapshot) throws AnalysisException { HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr() .getMetaStoreCache((HMSExternalCatalog) getCatalog()); HiveMetaStoreCache.HivePartitionValues hivePartitionValues = cache.getPartitionValues( @@ -796,7 +795,8 @@ public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshCont } @Override - public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context) throws AnalysisException { + public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, Optional snapshot) + throws AnalysisException { if (getPartitionType() == PartitionType.UNPARTITIONED) { return new MTMVMaxTimestampSnapshot(getName(), getLastDdlTime()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccSnapshot.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccSnapshot.java new file mode 100644 index 00000000000000..d7826b0a5de19e --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccSnapshot.java @@ -0,0 +1,25 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.mvcc; + +/** + * The snapshot information of mvcc is defined by each table, + * but it should be ensured that the table information queried through this snapshot remains unchanged + */ +public interface MvccSnapshot { +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccTable.java new file mode 100644 index 00000000000000..d69e0f3114df0c --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccTable.java @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.mvcc; + +import org.apache.doris.catalog.TableIf; + +/** + * The table that needs to query data based on the version needs to implement this interface. + */ +public interface MvccTable extends TableIf { + /** + * Retrieve the current snapshot information of the table, + * and the returned result will be used for the entire process of this query + * + * @return MvccSnapshot + */ + MvccSnapshot loadSnapshot(); +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccTableInfo.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccTableInfo.java new file mode 100644 index 00000000000000..0d865f837c8c4e --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccTableInfo.java @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.mvcc; + +import org.apache.doris.catalog.DatabaseIf; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.datasource.CatalogIf; + +import com.google.common.base.Objects; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class MvccTableInfo { + private static final Logger LOG = LogManager.getLogger(MvccTableInfo.class); + + private String tableName; + private String dbName; + private String ctlName; + + public MvccTableInfo(TableIf table) { + java.util.Objects.requireNonNull(table, "table is null"); + DatabaseIf database = table.getDatabase(); + java.util.Objects.requireNonNull(database, "database is null"); + CatalogIf catalog = database.getCatalog(); + java.util.Objects.requireNonNull(database, "catalog is null"); + this.tableName = table.getName(); + this.dbName = database.getFullName(); + this.ctlName = catalog.getName(); + } + + public String getTableName() { + return tableName; + } + + public String getDbName() { + return dbName; + } + + public String getCtlName() { + return ctlName; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + MvccTableInfo that = (MvccTableInfo) o; + return Objects.equal(tableName, that.tableName) && Objects.equal( + dbName, that.dbName) && Objects.equal(ctlName, that.ctlName); + } + + @Override + public int hashCode() { + return Objects.hashCode(tableName, dbName, ctlName); + } + + @Override + public String toString() { + return "MvccTableInfo{" + + "tableName='" + tableName + '\'' + + ", dbName='" + dbName + '\'' + + ", ctlName='" + ctlName + '\'' + + '}'; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java index 196b01efe2c0f1..07e4fd02ba90ac 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java @@ -46,6 +46,7 @@ import java.util.HashMap; import java.util.List; import java.util.Optional; +import java.util.Set; import java.util.stream.Collectors; public class PaimonExternalTable extends ExternalTable { diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionExprDateTrunc.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionExprDateTrunc.java index ea15c84d1b925d..95a8717e01c4c7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionExprDateTrunc.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionExprDateTrunc.java @@ -69,7 +69,7 @@ public void analyze(MTMVPartitionInfo mvPartitionInfo) throws AnalysisException String.format("timeUnit not support: %s, only support: %s", this.timeUnit, timeUnits)); } MTMVRelatedTableIf relatedTable = mvPartitionInfo.getRelatedTable(); - PartitionType partitionType = relatedTable.getPartitionType(); + PartitionType partitionType = relatedTable.getPartitionType(Optional.empty()); if (partitionType == PartitionType.RANGE) { Type partitionColumnType = MTMVPartitionUtil .getPartitionColumnType(mvPartitionInfo.getRelatedTable(), mvPartitionInfo.getRelatedCol()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionInfo.java index b3cd239269abc7..7eae44db0af4cc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionInfo.java @@ -25,6 +25,7 @@ import com.google.gson.annotations.SerializedName; import java.util.List; +import java.util.Optional; /** * MTMVPartitionInfo @@ -115,7 +116,7 @@ public int getRelatedColPos() throws AnalysisException { if (partitionType == MTMVPartitionType.SELF_MANAGE) { throw new AnalysisException("partitionType is: " + partitionType); } - List partitionColumns = getRelatedTable().getPartitionColumns(); + List partitionColumns = getRelatedTable().getPartitionColumns(Optional.empty()); for (int i = 0; i < partitionColumns.size(); i++) { if (partitionColumns.get(i).getName().equalsIgnoreCase(relatedCol)) { return i; diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java index 14b0f89ac76bcf..836a5f08bff2f3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java @@ -50,6 +50,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -329,7 +330,7 @@ public static boolean isSyncWithPartitions(MTMVRefreshContext context, String mt } for (String relatedPartitionName : relatedPartitionNames) { MTMVSnapshotIf relatedPartitionCurrentSnapshot = relatedTable - .getPartitionSnapshot(relatedPartitionName, context); + .getPartitionSnapshot(relatedPartitionName, context, Optional.empty()); if (!mtmv.getRefreshSnapshot() .equalsWithRelatedPartition(mtmvPartitionName, relatedPartitionName, relatedPartitionCurrentSnapshot)) { @@ -445,7 +446,7 @@ private static boolean isSyncWithBaseTable(MTMVRefreshContext context, String mt if (!baseTable.needAutoRefresh()) { return true; } - MTMVSnapshotIf baseTableCurrentSnapshot = baseTable.getTableSnapshot(context); + MTMVSnapshotIf baseTableCurrentSnapshot = baseTable.getTableSnapshot(context, Optional.empty()); return mtmv.getRefreshSnapshot() .equalsWithBaseTable(mtmvPartitionName, new BaseTableInfo(baseTable), baseTableCurrentSnapshot); } @@ -481,7 +482,7 @@ private static MTMVRefreshPartitionSnapshot generatePartitionSnapshot(MTMVRefres MTMVRelatedTableIf relatedTable = mtmv.getMvPartitionInfo().getRelatedTable(); for (String relatedPartitionName : relatedPartitionNames) { MTMVSnapshotIf partitionSnapshot = relatedTable - .getPartitionSnapshot(relatedPartitionName, context); + .getPartitionSnapshot(relatedPartitionName, context, Optional.empty()); refreshPartitionSnapshot.getPartitions() .put(relatedPartitionName, partitionSnapshot); } @@ -496,13 +497,13 @@ private static MTMVRefreshPartitionSnapshot generatePartitionSnapshot(MTMVRefres continue; } refreshPartitionSnapshot.addTableSnapshot(baseTableInfo, - ((MTMVRelatedTableIf) table).getTableSnapshot(context)); + ((MTMVRelatedTableIf) table).getTableSnapshot(context, Optional.empty())); } return refreshPartitionSnapshot; } public static Type getPartitionColumnType(MTMVRelatedTableIf relatedTable, String col) throws AnalysisException { - List partitionColumns = relatedTable.getPartitionColumns(); + List partitionColumns = relatedTable.getPartitionColumns(Optional.empty()); for (Column column : partitionColumns) { if (column.getName().equals(col)) { return column.getType(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescInitGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescInitGenerator.java index 13b58239376116..c6b4e331184e2a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescInitGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescInitGenerator.java @@ -20,6 +20,7 @@ import org.apache.doris.common.AnalysisException; import java.util.Map; +import java.util.Optional; /** * get all related partition descs @@ -29,6 +30,6 @@ public class MTMVRelatedPartitionDescInitGenerator implements MTMVRelatedPartiti @Override public void apply(MTMVPartitionInfo mvPartitionInfo, Map mvProperties, RelatedPartitionDescResult lastResult) throws AnalysisException { - lastResult.setItems(mvPartitionInfo.getRelatedTable().getAndCopyPartitionItems()); + lastResult.setItems(mvPartitionInfo.getRelatedTable().getAndCopyPartitionItems(Optional.empty())); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescRollUpGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescRollUpGenerator.java index 76e20ef70f5d92..325fab819d9a09 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescRollUpGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescRollUpGenerator.java @@ -31,6 +31,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; import java.util.Set; /** @@ -45,7 +46,7 @@ public void apply(MTMVPartitionInfo mvPartitionInfo, Map mvPrope return; } MTMVRelatedTableIf relatedTable = mvPartitionInfo.getRelatedTable(); - PartitionType partitionType = relatedTable.getPartitionType(); + PartitionType partitionType = relatedTable.getPartitionType(Optional.empty()); if (partitionType == PartitionType.RANGE) { lastResult.setDescs(rollUpRange(lastResult.getDescs(), mvPartitionInfo)); } else if (partitionType == PartitionType.LIST) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java index 4a8b14603ce4d6..974dd2b1b17063 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java @@ -23,9 +23,11 @@ import org.apache.doris.catalog.TableIf; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; +import org.apache.doris.datasource.mvcc.MvccSnapshot; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; /** @@ -38,31 +40,35 @@ public interface MTMVRelatedTableIf extends TableIf { * Note: This method is called every time there is a refresh and transparent rewrite, * so if this method is slow, it will significantly reduce query performance * + * @param snapshot * @return partitionName->PartitionItem */ - Map getAndCopyPartitionItems() throws AnalysisException; + Map getAndCopyPartitionItems(Optional snapshot) throws AnalysisException; /** * getPartitionType LIST/RANGE/UNPARTITIONED * + * @param snapshot * @return */ - PartitionType getPartitionType(); + PartitionType getPartitionType(Optional snapshot); /** * getPartitionColumnNames * + * @param snapshot * @return * @throws DdlException */ - Set getPartitionColumnNames() throws DdlException; + Set getPartitionColumnNames(Optional snapshot) throws DdlException; /** * getPartitionColumns * + * @param snapshot * @return */ - List getPartitionColumns(); + List getPartitionColumns(Optional snapshot); /** * getPartitionSnapshot @@ -70,12 +76,14 @@ public interface MTMVRelatedTableIf extends TableIf { * If snapshots have already been obtained in bulk in the context, * the results should be obtained directly from the context * + * @param snapshot * @param partitionName * @param context * @return partition snapshot at current time * @throws AnalysisException */ - MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context) throws AnalysisException; + MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context, + Optional snapshot) throws AnalysisException; /** * getTableSnapshot @@ -83,12 +91,13 @@ public interface MTMVRelatedTableIf extends TableIf { * If snapshots have already been obtained in bulk in the context, * the results should be obtained directly from the context * + * @param snapshot * @param context * @return table snapshot at current time * @throws AnalysisException */ - MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context) throws AnalysisException; - + MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, Optional snapshot) + throws AnalysisException; /** * Does the current type of table allow timed triggering * diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java index 6d58089bed889e..90d8461c68e6c6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java @@ -72,6 +72,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Multimap; +import org.apache.commons.collections.MapUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -478,6 +479,13 @@ private Set> getTables(LogicalPlan logicalPlan) { return tableNames; } + public Map, TableIf> getOrExtractTables(LogicalPlan logicalPlan) { + if (MapUtils.isEmpty(tables)) { + extractTables(logicalPlan); + } + return tables; + } + private Set> extractTableNamesFromHaving(LogicalHaving having) { Set subqueryExprs = having.getPredicate() .collect(SubqueryExpr.class::isInstance); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java index 299cd40da175cd..5cefde11e074d5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java @@ -200,7 +200,7 @@ public Plan planWithLock(LogicalPlan plan, PhysicalProperties requireProperties, plan = preprocess(plan); initCascadesContext(plan, requireProperties); - + statementContext.loadSnapshots(cascadesContext.getOrExtractTables(plan)); try (Lock lock = new Lock(plan, cascadesContext)) { Plan resultPlan = planWithoutLock(plan, explainLevel, showPlanProcess, requireProperties); lockCallback.accept(resultPlan); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java index 0503637ef95423..330abd274ec0a1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java @@ -24,6 +24,9 @@ import org.apache.doris.common.Id; import org.apache.doris.common.IdGenerator; import org.apache.doris.common.Pair; +import org.apache.doris.datasource.mvcc.MvccSnapshot; +import org.apache.doris.datasource.mvcc.MvccTable; +import org.apache.doris.datasource.mvcc.MvccTableInfo; import org.apache.doris.nereids.hint.Hint; import org.apache.doris.nereids.memo.Group; import org.apache.doris.nereids.rules.analysis.ColumnAliasGenerator; @@ -166,6 +169,8 @@ public class StatementContext implements Closeable { private List plannerHooks = new ArrayList<>(); + private final Map snapshots = Maps.newHashMap(); + public StatementContext() { this(ConnectContext.get(), null, 0); } @@ -500,6 +505,32 @@ public void addPlannerHook(PlannerHook plannerHook) { this.plannerHooks.add(plannerHook); } + /** + * Load snapshot information of mvcc + * + * @param tables Tables used in queries + */ + public void loadSnapshots(Map, TableIf> tables) { + if (tables == null) { + return; + } + for (TableIf tableIf : tables.values()) { + if (tableIf instanceof MvccTable) { + snapshots.put(new MvccTableInfo(tableIf), ((MvccTable) tableIf).loadSnapshot()); + } + } + } + + /** + * Obtain snapshot information of mvcc + * + * @param mvccTable mvccTable + * @return MvccSnapshot + */ + public MvccSnapshot getSnapshot(MvccTable mvccTable) { + return snapshots.get(new MvccTableInfo(mvccTable)); + } + private static class CloseableResource implements Closeable { public final String resourceName; public final String threadName; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtils.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtils.java index 52d9afa48ea57f..e1f9a4a6a6ac3f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtils.java @@ -433,13 +433,13 @@ public Void visitLogicalRelation(LogicalRelation relation, IncrementCheckerConte return null; } MTMVRelatedTableIf relatedTable = (MTMVRelatedTableIf) table; - PartitionType type = relatedTable.getPartitionType(); + PartitionType type = relatedTable.getPartitionType(Optional.empty()); if (PartitionType.UNPARTITIONED.equals(type)) { context.addFailReason(String.format("related base table is not partition table, the table is %s", table.getName())); return null; } - Set partitionColumnSet = new HashSet<>(relatedTable.getPartitionColumns()); + Set partitionColumnSet = new HashSet<>(relatedTable.getPartitionColumns(Optional.empty())); Column mvReferenceColumn = contextPartitionColumn.getColumn().get(); Expr definExpr = mvReferenceColumn.getDefineExpr(); if (definExpr instanceof SlotRef) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java index d50219383072df..4bbb0a8aa76270 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java @@ -36,7 +36,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.OptionalLong; +import java.util.Optional; import java.util.function.Function; import java.util.stream.Collectors; @@ -75,7 +75,7 @@ private SelectedPartitions pruneExternalPartitions(ExternalTable externalTable, LogicalFilter filter, LogicalFileScan scan, CascadesContext ctx) { Map selectedPartitionItems = Maps.newHashMap(); // todo: real snapshotId - if (CollectionUtils.isEmpty(externalTable.getPartitionColumns(OptionalLong.empty()))) { + if (CollectionUtils.isEmpty(externalTable.getPartitionColumns(Optional.empty()))) { // non partitioned table, return NOT_PRUNED. // non partition table will be handled in HiveScanNode. return SelectedPartitions.NOT_PRUNED; @@ -84,7 +84,7 @@ private SelectedPartitions pruneExternalPartitions(ExternalTable externalTable, .stream() .collect(Collectors.toMap(slot -> slot.getName().toLowerCase(), Function.identity())); // todo: real snapshotId - List partitionSlots = externalTable.getPartitionColumns(OptionalLong.empty()) + List partitionSlots = externalTable.getPartitionColumns(Optional.empty()) .stream() .map(column -> scanOutput.get(column.getName().toLowerCase())) .collect(Collectors.toList()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateMTMVInfo.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateMTMVInfo.java index be87e6c71f2c9a..837d9478c89327 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateMTMVInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateMTMVInfo.java @@ -351,7 +351,7 @@ private PartitionDesc generatePartitionDesc(ConnectContext ctx) { allPartitionDescs.size(), ctx.getSessionVariable().getCreateTablePartitionMaxNum())); } try { - PartitionType type = relatedTable.getPartitionType(); + PartitionType type = relatedTable.getPartitionType(Optional.empty()); if (type == PartitionType.RANGE) { return new RangePartitionDesc(Lists.newArrayList(mvPartitionInfo.getPartitionCol()), allPartitionDescs); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/MTMVPartitionDefinition.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/MTMVPartitionDefinition.java index 427e2368e7ab2b..c4117e8608e29d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/MTMVPartitionDefinition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/MTMVPartitionDefinition.java @@ -54,6 +54,7 @@ import com.google.common.collect.Sets; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -147,7 +148,7 @@ private RelatedTableInfo getRelatedTableInfo(NereidsPlanner planner, ConnectCont MTMVRelatedTableIf mtmvBaseRealtedTable = MTMVUtil.getRelatedTable(relatedTableInfo.getTableInfo()); Set partitionColumnNames = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER); try { - partitionColumnNames.addAll(mtmvBaseRealtedTable.getPartitionColumnNames()); + partitionColumnNames.addAll(mtmvBaseRealtedTable.getPartitionColumnNames(Optional.empty())); } catch (DdlException e) { throw new AnalysisException(e.getMessage(), e); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java index 010c30d915d529..96b8e032d11274 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java @@ -36,7 +36,6 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.OptionalLong; /** * Logical file scan for external catalog. @@ -64,7 +63,7 @@ public LogicalFileScan(RelationId id, ExternalTable table, List qualifie Optional tableSample, Optional tableSnapshot) { // todo: real snapshotId this(id, table, qualifier, Optional.empty(), Optional.empty(), - table.initSelectedPartitions(OptionalLong.empty()), tableSample, tableSnapshot); + table.initSelectedPartitions(Optional.empty()), tableSample, tableSnapshot); } public SelectedPartitions getSelectedPartitions() { diff --git a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java index 997385742dc09a..e5d2e21a8db626 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java @@ -35,6 +35,7 @@ import org.junit.Test; import java.util.List; +import java.util.Optional; import java.util.Set; public class MTMVPartitionUtilTest { @@ -112,7 +113,7 @@ public void setUp() throws NoSuchMethodException, SecurityException, AnalysisExc minTimes = 0; result = true; - baseOlapTable.getTableSnapshot((MTMVRefreshContext) any); + baseOlapTable.getTableSnapshot((MTMVRefreshContext) any, (Optional) any); minTimes = 0; result = baseSnapshotIf; @@ -132,7 +133,7 @@ public void setUp() throws NoSuchMethodException, SecurityException, AnalysisExc minTimes = 0; result = true; - baseOlapTable.getPartitionSnapshot(anyString, (MTMVRefreshContext) any); + baseOlapTable.getPartitionSnapshot(anyString, (MTMVRefreshContext) any, (Optional) any); minTimes = 0; result = baseSnapshotIf; From e4cc084fb2bc9a5fe72aebce3849dde9ab7d5126 Mon Sep 17 00:00:00 2001 From: zhangdong Date: Fri, 29 Nov 2024 12:06:33 +0800 Subject: [PATCH 4/4] 1 --- .../org/apache/doris/datasource/paimon/PaimonExternalTable.java | 1 - .../src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java | 1 + .../src/main/java/org/apache/doris/nereids/CascadesContext.java | 2 +- .../main/java/org/apache/doris/nereids/StatementContext.java | 2 +- 4 files changed, 3 insertions(+), 3 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java index 07e4fd02ba90ac..196b01efe2c0f1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java @@ -46,7 +46,6 @@ import java.util.HashMap; import java.util.List; import java.util.Optional; -import java.util.Set; import java.util.stream.Collectors; public class PaimonExternalTable extends ExternalTable { diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java index 974dd2b1b17063..c4261aa78f10be 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java @@ -98,6 +98,7 @@ MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext con */ MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, Optional snapshot) throws AnalysisException; + /** * Does the current type of table allow timed triggering * diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java index 90d8461c68e6c6..403d05f8c18f31 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java @@ -479,7 +479,7 @@ private Set> getTables(LogicalPlan logicalPlan) { return tableNames; } - public Map, TableIf> getOrExtractTables(LogicalPlan logicalPlan) { + public Map getOrExtractTables(LogicalPlan logicalPlan) { if (MapUtils.isEmpty(tables)) { extractTables(logicalPlan); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java index 330abd274ec0a1..c6e50df5172ab1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java @@ -510,7 +510,7 @@ public void addPlannerHook(PlannerHook plannerHook) { * * @param tables Tables used in queries */ - public void loadSnapshots(Map, TableIf> tables) { + public void loadSnapshots(Map tables) { if (tables == null) { return; }