Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,9 @@ public void addTaskResult(MTMVTask task, MTMVRelation relation,
if (!isReplay) {
ConnectContext currentContext = ConnectContext.get();
// shouldn't do this while holding mvWriteLock
mtmvCache = MTMVCache.from(this.getQuerySql(), MTMVPlanUtil.createMTMVContext(this), true,
true, currentContext);
mtmvCache = MTMVCache.from(this.getQuerySql(),
MTMVPlanUtil.createMTMVContext(this, MTMVPlanUtil.DISABLE_RULES_WHEN_GENERATE_MTMV_CACHE),
true, true, currentContext);
}
} catch (Throwable e) {
mtmvCache = null;
Expand Down Expand Up @@ -342,8 +343,9 @@ public MTMVCache getOrGenerateCache(ConnectContext connectionContext) throws
}
// Concurrent situations may result in duplicate cache generation,
// but we tolerate this in order to prevent nested use of readLock and write MvLock for the table
MTMVCache mtmvCache = MTMVCache.from(this.getQuerySql(), MTMVPlanUtil.createMTMVContext(this), true,
false, connectionContext);
MTMVCache mtmvCache = MTMVCache.from(this.getQuerySql(),
MTMVPlanUtil.createMTMVContext(this, MTMVPlanUtil.DISABLE_RULES_WHEN_GENERATE_MTMV_CACHE),
true, false, connectionContext);
writeMvLock();
try {
this.cache = mtmvCache;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ public void run() throws JobException {
if (LOG.isDebugEnabled()) {
LOG.debug("mtmv task run, taskId: {}", super.getTaskId());
}
ConnectContext ctx = MTMVPlanUtil.createMTMVContext(mtmv);
ConnectContext ctx = MTMVPlanUtil.createMTMVContext(mtmv, MTMVPlanUtil.DISABLE_RULES_WHEN_RUN_MTMV_TASK);
try {
if (LOG.isDebugEnabled()) {
String taskSessionContext = ctx.getSessionVariable().toJson().toJSONString();
Expand Down Expand Up @@ -350,7 +350,7 @@ private void executeWithRetry(Set<String> execPartitionNames, Map<TableIf, Strin
private void exec(Set<String> refreshPartitionNames,
Map<TableIf, String> tableWithPartKey)
throws Exception {
ConnectContext ctx = MTMVPlanUtil.createMTMVContext(mtmv);
ConnectContext ctx = MTMVPlanUtil.createMTMVContext(mtmv, MTMVPlanUtil.DISABLE_RULES_WHEN_RUN_MTMV_TASK);
StatementContext statementContext = new StatementContext();
for (Entry<MvccTableInfo, MvccSnapshot> entry : snapshots.entrySet()) {
statementContext.setSnapshot(entry.getKey(), entry.getValue());
Expand Down
39 changes: 25 additions & 14 deletions fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPlanUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@
import org.apache.doris.nereids.util.TypeCoercionUtils;
import org.apache.doris.qe.ConnectContext;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.collections.CollectionUtils;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -70,8 +70,28 @@

public class MTMVPlanUtil {

public static ConnectContext createMTMVContext(MTMV mtmv) {
ConnectContext ctx = createBasicMvContext(null);
// The rules should be disabled when generate MTMV cache
// Because these rules may change the plan structure and cause the plan can not match the mv
// this is mainly for final CBO phase rewrite, pre RBO phase does not need to consider, because
// maintain tmp plan alone for rewrite when pre RBO rewrite
public static final List<RuleType> DISABLE_RULES_WHEN_GENERATE_MTMV_CACHE = ImmutableList.of(
RuleType.COMPRESSED_MATERIALIZE_AGG,
RuleType.COMPRESSED_MATERIALIZE_SORT,
RuleType.ELIMINATE_CONST_JOIN_CONDITION,
RuleType.CONSTANT_PROPAGATION,
RuleType.ADD_DEFAULT_LIMIT,
RuleType.ELIMINATE_JOIN_BY_FK,
RuleType.ELIMINATE_JOIN_BY_UK,
RuleType.ELIMINATE_GROUP_BY_KEY_BY_UNIFORM,
RuleType.ELIMINATE_GROUP_BY,
RuleType.SALT_JOIN,
RuleType.AGG_SCALAR_SUBQUERY_TO_WINDOW_FUNCTION
);
// The rules should be disabled when run MTMV task
public static final List<RuleType> DISABLE_RULES_WHEN_RUN_MTMV_TASK = DISABLE_RULES_WHEN_GENERATE_MTMV_CACHE;

public static ConnectContext createMTMVContext(MTMV mtmv, List<RuleType> disableRules) {
ConnectContext ctx = createBasicMvContext(null, disableRules);
Optional<String> workloadGroup = mtmv.getWorkloadGroup();
if (workloadGroup.isPresent()) {
ctx.getSessionVariable().setWorkloadGroup(workloadGroup.get());
Expand All @@ -83,7 +103,8 @@ public static ConnectContext createMTMVContext(MTMV mtmv) {
return ctx;
}

public static ConnectContext createBasicMvContext(@Nullable ConnectContext parentContext) {
public static ConnectContext createBasicMvContext(@Nullable ConnectContext parentContext,
List<RuleType> disableRules) {
ConnectContext ctx = new ConnectContext();
ctx.setEnv(Env.getCurrentEnv());
ctx.setCurrentUserIdentity(UserIdentity.ADMIN);
Expand All @@ -97,16 +118,6 @@ public static ConnectContext createBasicMvContext(@Nullable ConnectContext paren
ctx.getSessionVariable().skipStorageEngineMerge = false;
ctx.getSessionVariable().showHiddenColumns = false;
ctx.getSessionVariable().allowModifyMaterializedViewData = true;
// Rules disabled during materialized view plan generation. These rules can cause significant plan changes,
// which may affect transparent query rewriting by mv
List<RuleType> disableRules = Arrays.asList(
RuleType.COMPRESSED_MATERIALIZE_AGG,
RuleType.COMPRESSED_MATERIALIZE_SORT,
RuleType.ELIMINATE_CONST_JOIN_CONDITION,
RuleType.CONSTANT_PROPAGATION,
RuleType.ADD_DEFAULT_LIMIT,
RuleType.ELIMINATE_GROUP_BY
);
ctx.getSessionVariable().setDisableNereidsRules(
disableRules.stream().map(RuleType::name).collect(Collectors.joining(",")));
ctx.setStartTime();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -454,13 +454,19 @@ protected void preMaterializedViewRewrite() {
LOG.debug("Start pre rewrite plan by mv");
}
List<Plan> tmpPlansForMvRewrite = cascadesContext.getStatementContext().getTmpPlanForMvRewrite();
Plan originalPlan = cascadesContext.getRewritePlan();
List<Plan> plansWhichContainMv = new ArrayList<>();
// because tmpPlansForMvRewrite only one, so timeout is cumulative which is ok
for (Plan planForRewrite : tmpPlansForMvRewrite) {
if (!planForRewrite.getLogicalProperties().equals(
cascadesContext.getRewritePlan().getLogicalProperties())) {
continue;
}
SessionVariable sessionVariable = cascadesContext.getConnectContext()
.getSessionVariable();
int timeoutSecond = sessionVariable.nereidsTimeoutSecond;
boolean enableTimeout = sessionVariable.enableNereidsTimeout;
try {
// set mv rewrite timeout
sessionVariable.nereidsTimeoutSecond = PreMaterializedViewRewriter.convertMillisToCeilingSeconds(
sessionVariable.materializedViewRewriteDurationThresholdMs);
sessionVariable.enableNereidsTimeout = true;
// pre rewrite
Plan rewrittenPlan = MaterializedViewUtils.rewriteByRules(cascadesContext,
PreMaterializedViewRewriter::rewrite, planForRewrite, planForRewrite, true);
Expand All @@ -472,10 +478,19 @@ protected void preMaterializedViewRewrite() {
if (ruleOptimizedPlan == null) {
continue;
}
plansWhichContainMv.add(ruleOptimizedPlan);
// after rbo, maybe the plan changed a lot, so we need to normalize it with original plan
Plan normalizedPlan = MaterializedViewUtils.normalizeSinkExpressions(
ruleOptimizedPlan, originalPlan);
if (normalizedPlan != null) {
plansWhichContainMv.add(normalizedPlan);
}
} catch (Exception e) {
LOG.error("pre mv rewrite in rbo rewrite fail, query id is {}",
cascadesContext.getConnectContext().getQueryIdentifier(), e);

} finally {
sessionVariable.nereidsTimeoutSecond = timeoutSecond;
sessionVariable.enableNereidsTimeout = enableTimeout;
}
}
// clear the rewritten plans which are tmp optimized, should be filled by full optimize later
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,6 @@ public class Rewriter extends AbstractBatchJobExecutor {
// so there may be two filters we need to merge them
new MergeFilters()
),
custom(RuleType.AGG_SCALAR_SUBQUERY_TO_WINDOW_FUNCTION,
AggScalarSubQueryToWindowFunction::new),
bottomUp(
new EliminateUselessPlanUnderApply(),
// CorrelateApplyToUnCorrelateApply and ApplyToJoin
Expand All @@ -243,13 +241,7 @@ public class Rewriter extends AbstractBatchJobExecutor {
* we expected.
*/
new CorrelateApplyToUnCorrelateApply(),
new ApplyToJoin(),
// UnCorrelatedApplyAggregateFilter rule will create new aggregate outputs,
// The later rule CheckPrivileges which inherent from ColumnPruning
// only works
// if the aggregation node is normalized, so we need call
// NormalizeAggregate here
new NormalizeAggregate()
new ApplyToJoin()
)
),
// before `Subquery unnesting` topic, some correlate slots should have appeared at
Expand Down Expand Up @@ -408,7 +400,6 @@ public class Rewriter extends AbstractBatchJobExecutor {
custom(RuleType.ADJUST_NULLABLE, () -> new AdjustNullable(false))
),
topic("add projection for join",
// this is for hint project join rewrite rule
custom(RuleType.ADD_PROJECT_FOR_JOIN, AddProjectForJoin::new),
topDown(new MergeProjectable())
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,21 +57,25 @@
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalUnion;
import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter;
import org.apache.doris.nereids.types.VariantType;
import org.apache.doris.nereids.util.ExpressionUtils;
import org.apache.doris.nereids.util.TypeUtils;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.statistics.Statistics;

import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -450,6 +454,10 @@ protected List<Plan> doRewrite(StructInfo queryStructInfo, CascadesContext casca
}
}
trySetStatistics(materializationContext, cascadesContext);
// Derive the operative column for materialized view scan
rewrittenPlan = deriveOperativeColumn(rewrittenPlan, queryStructInfo,
materializationContext.getShuttledExprToScanExprMapping(), viewToQuerySlotMapping,
materializationContext);
rewriteResults.add(rewrittenPlan);
recordIfRewritten(queryStructInfo.getOriginalPlan(), materializationContext, cascadesContext);
// If rewrite successfully, try to clear mv scan currently because it maybe used again
Expand Down Expand Up @@ -538,6 +546,51 @@ protected Plan rewriteQueryByView(MatchMode matchMode, StructInfo queryStructInf
return tempRewritedPlan;
}

/**
* Derive the operative column for materialized view scan, if the operative column in query can be
* represented by the operative column in materialized view, then set the operative column in
* materialized view scan, otherwise return the materialized view scan without operative column
*/
private static Plan deriveOperativeColumn(Plan rewrittenPlan, StructInfo queryStructInfo,
ExpressionMapping targetExpressionMapping, SlotMapping targetToSourceMapping,
MaterializationContext materializationContext) {
ExpressionMapping expressionMappingKeySourceBased = targetExpressionMapping.keyPermute(targetToSourceMapping);
// target to target replacement expression mapping, because mv is 1:1 so get first element
List<Map<Expression, Expression>> flattenExpressionMap = expressionMappingKeySourceBased.flattenMap();
Map<Expression, Expression> targetToTargetReplacementMappingQueryBased =
flattenExpressionMap.get(0);
final Multimap<NamedExpression, Slot> slotMapping = ArrayListMultimap.create();
for (Map.Entry<Expression, Expression> entry : targetToTargetReplacementMappingQueryBased.entrySet()) {
if (entry.getValue() instanceof Slot) {
entry.getKey().collect(NamedExpression.class::isInstance).forEach(
namedExpression -> slotMapping.put(
(NamedExpression) namedExpression, (Slot) entry.getValue()));
}
}
Set<Slot> operativeSlots = new HashSet<>();
for (CatalogRelation relation : queryStructInfo.getRelations()) {
List<Slot> relationOperativeSlots = relation.getOperativeSlots();
if (relationOperativeSlots.isEmpty()) {
continue;
}
for (Slot slot : relationOperativeSlots) {
Collection<Slot> mvOutputSlots = slotMapping.get(slot);
if (!mvOutputSlots.isEmpty()) {
operativeSlots.addAll(mvOutputSlots);
}
}
}
return rewrittenPlan.accept(new DefaultPlanRewriter<MaterializationContext>() {
@Override
public Plan visitLogicalOlapScan(LogicalOlapScan olapScan, MaterializationContext context) {
if (context.generateMaterializationIdentifier().equals(olapScan.getTable().getFullQualifiers())) {
return olapScan.withOperativeSlots(operativeSlots);
}
return super.visitLogicalOlapScan(olapScan, context);
}
}, materializationContext);
}

/**
* Use target expression to represent the source expression. Visit the source expression,
* try to replace the source expression with target expression in targetExpressionMapping, if found then
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,8 @@ private List<MaterializationContext> createSyncMvContexts(OlapTable olapTable,
continue;
}
ConnectContext basicMvContext = MTMVPlanUtil.createBasicMvContext(
cascadesContext.getConnectContext());
cascadesContext.getConnectContext(),
MTMVPlanUtil.DISABLE_RULES_WHEN_GENERATE_MTMV_CACHE);
basicMvContext.setDatabase(meta.getDbName());
MTMVCache mtmvCache = MTMVCache.from(querySql.get(),
basicMvContext, true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ protected Statistics normalizeStatisticsColumnExpression(Statistics originalPlan
if (sourceExpression != null && targetExpression instanceof NamedExpression
&& sourceExpression instanceof NamedExpression) {
normalizedExpressionMap.put(MaterializedViewUtils.normalizeExpression(
(NamedExpression) sourceExpression, (NamedExpression) targetExpression).toSlot(),
(NamedExpression) sourceExpression, (NamedExpression) targetExpression, false).toSlot(),
entry.getValue());
}
}
Expand Down
Loading
Loading