Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ protected Plan rewriteQueryByView(MatchMode matchMode,
viewStructInfo, materializationContext)) {
List<Expression> rewrittenQueryExpressions = rewriteExpression(queryTopPlan.getOutput(),
queryTopPlan,
materializationContext.getExprToScanExprMapping(),
materializationContext.getShuttledExprToScanExprMapping(),
viewToQuerySlotMapping,
true,
queryStructInfo.getTableBitSet());
Expand All @@ -121,7 +121,7 @@ protected Plan rewriteQueryByView(MatchMode matchMode,
() -> String.format("expressionToWrite = %s,\n mvExprToMvScanExprMapping = %s,\n"
+ "viewToQuerySlotMapping = %s",
queryTopPlan.getOutput(),
materializationContext.getExprToScanExprMapping(),
materializationContext.getShuttledExprToScanExprMapping(),
viewToQuerySlotMapping));
}
// if view is scalar aggregate but query is not. Or if query is scalar aggregate but view is not
Expand Down Expand Up @@ -150,7 +150,7 @@ protected Plan rewriteQueryByView(MatchMode matchMode,
List<? extends Expression> queryExpressions = queryTopPlan.getOutput();
// permute the mv expr mapping to query based
Map<Expression, Expression> mvExprToMvScanExprQueryBased =
materializationContext.getExprToScanExprMapping().keyPermute(viewToQuerySlotMapping)
materializationContext.getShuttledExprToScanExprMapping().keyPermute(viewToQuerySlotMapping)
.flattenMap().get(0);
for (Expression topExpression : queryExpressions) {
// if agg function, try to roll up and rewrite
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ protected Plan rewriteQueryByView(MatchMode matchMode,
List<Expression> expressionsRewritten = rewriteExpression(
queryStructInfo.getExpressions(),
queryStructInfo.getTopPlan(),
materializationContext.getExprToScanExprMapping(),
materializationContext.getShuttledExprToScanExprMapping(),
targetToSourceMapping,
true,
queryStructInfo.getTableBitSet()
Expand All @@ -57,7 +57,7 @@ protected Plan rewriteQueryByView(MatchMode matchMode,
"Rewrite expressions by view in join fail",
() -> String.format("expressionToRewritten is %s,\n mvExprToMvScanExprMapping is %s,\n"
+ "targetToSourceMapping = %s", queryStructInfo.getExpressions(),
materializationContext.getExprToScanExprMapping(),
materializationContext.getShuttledExprToScanExprMapping(),
targetToSourceMapping));
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,14 +235,14 @@ protected List<Plan> doRewrite(StructInfo queryStructInfo, CascadesContext casca
} else {
// Try to rewrite compensate predicates by using mv scan
List<Expression> rewriteCompensatePredicates = rewriteExpression(compensatePredicates.toList(),
queryPlan, materializationContext.getExprToScanExprMapping(),
queryPlan, materializationContext.getShuttledExprToScanExprMapping(),
viewToQuerySlotMapping, true, queryStructInfo.getTableBitSet());
if (rewriteCompensatePredicates.isEmpty()) {
materializationContext.recordFailReason(queryStructInfo,
"Rewrite compensate predicate by view fail",
() -> String.format("compensatePredicates = %s,\n mvExprToMvScanExprMapping = %s,\n"
+ "viewToQuerySlotMapping = %s",
compensatePredicates, materializationContext.getExprToScanExprMapping(),
compensatePredicates, materializationContext.getShuttledExprToScanExprMapping(),
viewToQuerySlotMapping));
continue;
}
Expand Down Expand Up @@ -325,19 +325,23 @@ protected List<Plan> doRewrite(StructInfo queryStructInfo, CascadesContext casca
continue;
}
recordIfRewritten(queryStructInfo.getOriginalPlan(), materializationContext);
Optional<Pair<Id, Statistics>> materializationPlanStatistics =
materializationContext.getPlanStatistics(cascadesContext);
if (materializationPlanStatistics.isPresent() && materializationPlanStatistics.get().key() != null) {
cascadesContext.getStatementContext().addStatistics(
materializationPlanStatistics.get().key(), materializationPlanStatistics.get().value());
}
trySetStatistics(materializationContext, cascadesContext);
rewriteResults.add(rewrittenPlan);
// if rewrite successfully, try to regenerate mv scan because it maybe used again
materializationContext.tryReGenerateScanPlan(cascadesContext);
}
return rewriteResults;
}

// Set materialization context statistics to statementContext for cost estimate later
private static void trySetStatistics(MaterializationContext context, CascadesContext cascadesContext) {
Optional<Pair<Id, Statistics>> materializationPlanStatistics = context.getPlanStatistics(cascadesContext);
if (materializationPlanStatistics.isPresent() && materializationPlanStatistics.get().key() != null) {
cascadesContext.getStatementContext().addStatistics(materializationPlanStatistics.get().key(),
materializationPlanStatistics.get().value());
}
}

private boolean needUnionRewrite(
Pair<Map<BaseTableInfo, Set<String>>, Map<BaseTableInfo, Set<String>>> invalidPartitions,
CascadesContext cascadesContext) {
Expand Down Expand Up @@ -520,8 +524,9 @@ protected List<Expression> rewriteExpression(List<? extends Expression> sourceEx
/**
* Normalize expression with query, keep the consistency of exprId and nullable props with
* query
* Keep the replacedExpression slot property is the same as the sourceExpression
*/
private NamedExpression normalizeExpression(
public static NamedExpression normalizeExpression(
NamedExpression sourceExpression, NamedExpression replacedExpression) {
Expression innerExpression = replacedExpression;
if (replacedExpression.nullable() != sourceExpression.nullable()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public Optional<Pair<Id, Statistics>> getPlanStatistics(CascadesContext cascades
if (!logicalOlapScan.isEmpty()) {
relationId = logicalOlapScan.get(0).getRelationId();
}
return Optional.of(Pair.of(relationId, mtmvCache.getStatistics()));
return Optional.of(Pair.of(relationId, normalizeStatisticsColumnExpression(mtmvCache.getStatistics())));
}

@Override
Expand All @@ -131,8 +131,8 @@ public List<Table> getBaseViews() {
return baseViews;
}

public ExpressionMapping getExprToScanExprMapping() {
return exprToScanExprMapping;
public ExpressionMapping getShuttledExprToScanExprMapping() {
return shuttledExprToScanExprMapping;
}

public boolean isAvailable() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.doris.nereids.rules.exploration.mv.mapping.RelationMapping;
import org.apache.doris.nereids.rules.exploration.mv.mapping.SlotMapping;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.plans.ObjectId;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.algebra.Relation;
Expand All @@ -35,6 +37,7 @@
import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation;
import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanVisitor;
import org.apache.doris.nereids.util.ExpressionUtils;
import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.statistics.Statistics;

import com.google.common.collect.HashMultimap;
Expand Down Expand Up @@ -63,18 +66,21 @@ public abstract class MaterializationContext {
protected List<Table> baseTables;
protected List<Table> baseViews;
// The plan of materialization def sql
protected Plan plan;
protected final Plan plan;
// The original plan of materialization sql
protected Plan originalPlan;
protected final Plan originalPlan;
// Should regenerate when materialization is already rewritten successfully because one query may hit repeatedly
// make sure output is different in multi using
protected Plan scanPlan;
// The materialization plan output shuttled expression, this is used by generate field
// exprToScanExprMapping
protected List<? extends Expression> planOutputShuttledExpressions;
// Generated mapping from materialization plan out expr to materialization scan plan out slot mapping,
// this is used for later
protected Map<Expression, Expression> exprToScanExprMapping = new HashMap<>();
// Generated mapping from materialization plan out shuttled expr to materialization scan plan out slot mapping,
// this is used for later used
protected ExpressionMapping exprToScanExprMapping;
// this is used for expression rewrite
protected ExpressionMapping shuttledExprToScanExprMapping;
// This mark the materialization context is available or not,
// will not be used in query transparent rewritten if false
protected boolean available = true;
Expand Down Expand Up @@ -106,15 +112,19 @@ public MaterializationContext(Plan plan, Plan originalPlan, Plan scanPlan,
StatementBase parsedStatement = cascadesContext.getStatementContext().getParsedStatement();
this.enableRecordFailureDetail = parsedStatement != null && parsedStatement.isExplain()
&& ExplainLevel.MEMO_PLAN == parsedStatement.getExplainOptions().getExplainLevel();

this.planOutputShuttledExpressions = ExpressionUtils.shuttleExpressionWithLineage(
originalPlan.getOutput(),
originalPlan,
new BitSet());
List<Slot> originalPlanOutput = originalPlan.getOutput();
List<Slot> scanPlanOutput = this.scanPlan.getOutput();
if (originalPlanOutput.size() == scanPlanOutput.size()) {
for (int slotIndex = 0; slotIndex < originalPlanOutput.size(); slotIndex++) {
this.exprToScanExprMapping.put(originalPlanOutput.get(slotIndex), scanPlanOutput.get(slotIndex));
}
}
this.planOutputShuttledExpressions = ExpressionUtils.shuttleExpressionWithLineage(originalPlanOutput,
originalPlan, new BitSet());
// materialization output expression shuttle, this will be used to expression rewrite
this.exprToScanExprMapping = ExpressionMapping.generate(
this.shuttledExprToScanExprMapping = ExpressionMapping.generate(
this.planOutputShuttledExpressions,
this.scanPlan.getOutput());
scanPlanOutput);
// Construct materialization struct info, catch exception which may cause planner roll back
if (structInfo == null) {
Optional<StructInfo> structInfoOptional = constructStructInfo(plan, cascadesContext, new BitSet());
Expand Down Expand Up @@ -170,9 +180,18 @@ public void addMatchedGroup(GroupId groupId, boolean rewriteSuccess) {
public void tryReGenerateScanPlan(CascadesContext cascadesContext) {
this.scanPlan = doGenerateScanPlan(cascadesContext);
// materialization output expression shuttle, this will be used to expression rewrite
this.exprToScanExprMapping = ExpressionMapping.generate(
this.shuttledExprToScanExprMapping = ExpressionMapping.generate(
this.planOutputShuttledExpressions,
this.scanPlan.getOutput());
Map<Expression, Expression> regeneratedMapping = new HashMap<>();
List<Slot> originalPlanOutput = originalPlan.getOutput();
List<Slot> scanPlanOutput = this.scanPlan.getOutput();
if (originalPlanOutput.size() == scanPlanOutput.size()) {
for (int slotIndex = 0; slotIndex < originalPlanOutput.size(); slotIndex++) {
regeneratedMapping.put(originalPlanOutput.get(slotIndex), scanPlanOutput.get(slotIndex));
}
}
this.exprToScanExprMapping = regeneratedMapping;
}

public void addSlotMappingToCache(RelationMapping relationMapping, SlotMapping slotMapping) {
Expand Down Expand Up @@ -202,12 +221,33 @@ public SlotMapping getSlotMappingFromCache(RelationMapping relationMapping) {
abstract String getStringInfo();

/**
* Get materialization plan statistics, the key is the identifier of statistics
* the value is Statistics.
* Get materialization plan statistics,
* the key is the identifier of statistics which is usual the scan plan relationId or something similar
* the value is original plan statistics.
* the statistics is used by cost estimation when the materialization is used
* Which should be the materialization origin plan statistics
*/
abstract Optional<Pair<Id, Statistics>> getPlanStatistics(CascadesContext cascadesContext);

// original plan statistics is generated by origin plan, and the column expression in statistics
// should be keep consistent to mv scan plan
protected Statistics normalizeStatisticsColumnExpression(Statistics originalPlanStatistics) {
Map<Expression, ColumnStatistic> normalizedExpressionMap = new HashMap<>();
// this statistics column expression is materialization origin plan, should normalize it to
// materialization scan plan
for (Map.Entry<Expression, ColumnStatistic> entry : originalPlanStatistics.columnStatistics().entrySet()) {
Expression targetExpression = entry.getKey();
Expression sourceExpression = this.getExprToScanExprMapping().get(targetExpression);
if (sourceExpression != null && targetExpression instanceof NamedExpression
&& sourceExpression instanceof NamedExpression) {
normalizedExpressionMap.put(AbstractMaterializedViewRule.normalizeExpression(
(NamedExpression) sourceExpression, (NamedExpression) targetExpression).toSlot(),
entry.getValue());
}
}
return originalPlanStatistics.withExpressionToColumnStats(normalizedExpressionMap);
}

/**
* Calc the relation is chosen finally or not
*/
Expand All @@ -233,10 +273,14 @@ public List<Table> getBaseViews() {
return baseViews;
}

public ExpressionMapping getExprToScanExprMapping() {
public Map<Expression, Expression> getExprToScanExprMapping() {
return exprToScanExprMapping;
}

public ExpressionMapping getShuttledExprToScanExprMapping() {
return shuttledExprToScanExprMapping;
}

public boolean isAvailable() {
return available;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ protected Plan rewriteQueryByView(MatchMode matchMode,
List<Expression> expressionsRewritten = rewriteExpression(
queryStructInfo.getExpressions(),
queryStructInfo.getTopPlan(),
materializationContext.getExprToScanExprMapping(),
materializationContext.getShuttledExprToScanExprMapping(),
targetToSourceMapping,
true,
queryStructInfo.getTableBitSet()
Expand All @@ -58,7 +58,7 @@ protected Plan rewriteQueryByView(MatchMode matchMode,
"Rewrite expressions by view in scan fail",
() -> String.format("expressionToRewritten is %s,\n mvExprToMvScanExprMapping is %s,\n"
+ "targetToSourceMapping = %s", queryStructInfo.getExpressions(),
materializationContext.getExprToScanExprMapping(),
materializationContext.getShuttledExprToScanExprMapping(),
targetToSourceMapping));
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ public Statistics withRowCount(double rowCount) {
return new Statistics(rowCount, widthInJoinCluster, new HashMap<>(expressionToColumnStats));
}

public Statistics withExpressionToColumnStats(Map<Expression, ColumnStatistic> expressionToColumnStats) {
return new Statistics(rowCount, widthInJoinCluster, expressionToColumnStats);
}

/**
* Update by count.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.doris.mtmv.MTMVRelationManager;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.sqltest.SqlTestBase;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.plans.RelationId;
import org.apache.doris.nereids.util.PlanChecker;
import org.apache.doris.qe.ConnectContext;
Expand All @@ -34,7 +35,7 @@

import java.util.BitSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

/**
* Test idStatisticsMap in StatementContext is valid
Expand Down Expand Up @@ -70,16 +71,19 @@ public boolean isMVPartitionValid(MTMV mtmv, ConnectContext ctx) {
+ "inner join T3 on T1.id = T3.id",
connectContext
);
PlanChecker.from(c1)
PlanChecker tmpPlanChecker = PlanChecker.from(c1)
.analyze()
.rewrite()
.rewrite();
// scan plan output will be refreshed after mv rewrite successfully, so need tmp store
Set<Slot> materializationScanOutput = c1.getMaterializationContexts().get(0).getScanPlan().getOutputSet();
tmpPlanChecker
.optimize()
.printlnBestPlanTree();
Map<RelationId, Statistics> idStatisticsMap = c1.getStatementContext().getRelationIdToStatisticsMap();
Assertions.assertFalse(idStatisticsMap.isEmpty());
RelationId relationId = idStatisticsMap.keySet().iterator().next();
Optional<Statistics> statistics = c1.getStatementContext().getStatistics(relationId);
Assertions.assertTrue(statistics.isPresent());
Statistics statistics = idStatisticsMap.values().iterator().next();
// statistics key set should be equals to materialization scan plan output
Assertions.assertEquals(materializationScanOutput, statistics.columnStatistics().keySet());
dropMvByNereids("drop materialized view mv100");
}
}