Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,9 @@ public boolean addTaskResult(MTMVTask task, MTMVRelation relation,
if (!isReplay) {
ConnectContext currentContext = ConnectContext.get();
// shouldn't do this while holding mvWriteLock
mtmvCache = MTMVCache.from(this.getQuerySql(), MTMVPlanUtil.createMTMVContext(this), true,
true, currentContext);
mtmvCache = MTMVCache.from(this.getQuerySql(),
MTMVPlanUtil.createMTMVContext(this, MTMVPlanUtil.DISABLE_RULES_WHEN_GENERATE_MTMV_CACHE),
true, true, currentContext);
}
} catch (Throwable e) {
mtmvCache = null;
Expand Down Expand Up @@ -351,8 +352,9 @@ public MTMVCache getOrGenerateCache(ConnectContext connectionContext) throws
}
// Concurrent situations may result in duplicate cache generation,
// but we tolerate this in order to prevent nested use of readLock and write MvLock for the table
MTMVCache mtmvCache = MTMVCache.from(this.getQuerySql(), MTMVPlanUtil.createMTMVContext(this), true,
false, connectionContext);
MTMVCache mtmvCache = MTMVCache.from(this.getQuerySql(),
MTMVPlanUtil.createMTMVContext(this, MTMVPlanUtil.DISABLE_RULES_WHEN_GENERATE_MTMV_CACHE),
true, false, connectionContext);
writeMvLock();
try {
this.cache = mtmvCache;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ public void run() throws JobException {
LOG.debug("mtmv task run, taskId: {}", super.getTaskId());
}
mtmvSchemaChangeVersion = mtmv.getSchemaChangeVersion();
ConnectContext ctx = MTMVPlanUtil.createMTMVContext(mtmv);
ConnectContext ctx = MTMVPlanUtil.createMTMVContext(mtmv, MTMVPlanUtil.DISABLE_RULES_WHEN_RUN_MTMV_TASK);
try {
if (LOG.isDebugEnabled()) {
String taskSessionContext = ctx.getSessionVariable().toJson().toJSONString();
Expand Down Expand Up @@ -320,7 +320,7 @@ private void executeWithRetry(Set<String> execPartitionNames, Map<TableIf, Strin
private void exec(Set<String> refreshPartitionNames,
Map<TableIf, String> tableWithPartKey)
throws Exception {
ConnectContext ctx = MTMVPlanUtil.createMTMVContext(mtmv);
ConnectContext ctx = MTMVPlanUtil.createMTMVContext(mtmv, MTMVPlanUtil.DISABLE_RULES_WHEN_RUN_MTMV_TASK);
StatementContext statementContext = new StatementContext();
for (Entry<MvccTableInfo, MvccSnapshot> entry : snapshots.entrySet()) {
statementContext.setSnapshot(entry.getKey(), entry.getValue());
Expand Down
39 changes: 25 additions & 14 deletions fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPlanUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,14 @@
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -96,8 +96,28 @@
public class MTMVPlanUtil {
private static final Logger LOG = LogManager.getLogger(MTMVPlanUtil.class);

public static ConnectContext createMTMVContext(MTMV mtmv) {
ConnectContext ctx = createBasicMvContext(null);
// The rules should be disabled when generate MTMV cache
// Because these rules may change the plan structure and cause the plan can not match the mv
// this is mainly for final CBO phase rewrite, pre RBO phase does not need to consider, because
// maintain tmp plan alone for rewrite when pre RBO rewrite
public static final List<RuleType> DISABLE_RULES_WHEN_GENERATE_MTMV_CACHE = ImmutableList.of(
RuleType.COMPRESSED_MATERIALIZE_AGG,
RuleType.COMPRESSED_MATERIALIZE_SORT,
RuleType.ELIMINATE_CONST_JOIN_CONDITION,
RuleType.CONSTANT_PROPAGATION,
RuleType.ADD_DEFAULT_LIMIT,
RuleType.ELIMINATE_JOIN_BY_FK,
RuleType.ELIMINATE_JOIN_BY_UK,
RuleType.ELIMINATE_GROUP_BY_KEY_BY_UNIFORM,
RuleType.ELIMINATE_GROUP_BY,
RuleType.SALT_JOIN,
RuleType.AGG_SCALAR_SUBQUERY_TO_WINDOW_FUNCTION
);
// The rules should be disabled when run MTMV task
public static final List<RuleType> DISABLE_RULES_WHEN_RUN_MTMV_TASK = DISABLE_RULES_WHEN_GENERATE_MTMV_CACHE;

public static ConnectContext createMTMVContext(MTMV mtmv, List<RuleType> disableRules) {
ConnectContext ctx = createBasicMvContext(null, disableRules);
Optional<String> workloadGroup = mtmv.getWorkloadGroup();
if (workloadGroup.isPresent()) {
ctx.getSessionVariable().setWorkloadGroup(workloadGroup.get());
Expand All @@ -109,7 +129,8 @@ public static ConnectContext createMTMVContext(MTMV mtmv) {
return ctx;
}

public static ConnectContext createBasicMvContext(@Nullable ConnectContext parentContext) {
public static ConnectContext createBasicMvContext(@Nullable ConnectContext parentContext,
List<RuleType> disableRules) {
ConnectContext ctx = new ConnectContext();
ctx.setEnv(Env.getCurrentEnv());
ctx.setCurrentUserIdentity(UserIdentity.ADMIN);
Expand All @@ -123,16 +144,6 @@ public static ConnectContext createBasicMvContext(@Nullable ConnectContext paren
ctx.getSessionVariable().skipStorageEngineMerge = false;
ctx.getSessionVariable().showHiddenColumns = false;
ctx.getSessionVariable().allowModifyMaterializedViewData = true;
// Rules disabled during materialized view plan generation. These rules can cause significant plan changes,
// which may affect transparent query rewriting by mv
List<RuleType> disableRules = Arrays.asList(
RuleType.COMPRESSED_MATERIALIZE_AGG,
RuleType.COMPRESSED_MATERIALIZE_SORT,
RuleType.ELIMINATE_CONST_JOIN_CONDITION,
RuleType.CONSTANT_PROPAGATION,
RuleType.ADD_DEFAULT_LIMIT,
RuleType.ELIMINATE_GROUP_BY
);
ctx.getSessionVariable().setDisableNereidsRules(
disableRules.stream().map(RuleType::name).collect(Collectors.joining(",")));
ctx.setStartTime();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -452,13 +452,19 @@ protected void preMaterializedViewRewrite() {
LOG.debug("Start pre rewrite plan by mv");
}
List<Plan> tmpPlansForMvRewrite = cascadesContext.getStatementContext().getTmpPlanForMvRewrite();
Plan originalPlan = cascadesContext.getRewritePlan();
List<Plan> plansWhichContainMv = new ArrayList<>();
// because tmpPlansForMvRewrite only one, so timeout is cumulative which is ok
for (Plan planForRewrite : tmpPlansForMvRewrite) {
if (!planForRewrite.getLogicalProperties().equals(
cascadesContext.getRewritePlan().getLogicalProperties())) {
continue;
}
SessionVariable sessionVariable = cascadesContext.getConnectContext()
.getSessionVariable();
int timeoutSecond = sessionVariable.nereidsTimeoutSecond;
boolean enableTimeout = sessionVariable.enableNereidsTimeout;
try {
// set mv rewrite timeout
sessionVariable.nereidsTimeoutSecond = PreMaterializedViewRewriter.convertMillisToCeilingSeconds(
sessionVariable.materializedViewRewriteDurationThresholdMs);
sessionVariable.enableNereidsTimeout = true;
// pre rewrite
Plan rewrittenPlan = MaterializedViewUtils.rewriteByRules(cascadesContext,
PreMaterializedViewRewriter::rewrite, planForRewrite, planForRewrite, true);
Expand All @@ -470,10 +476,19 @@ protected void preMaterializedViewRewrite() {
if (ruleOptimizedPlan == null) {
continue;
}
plansWhichContainMv.add(ruleOptimizedPlan);
// after rbo, maybe the plan changed a lot, so we need to normalize it with original plan
Plan normalizedPlan = MaterializedViewUtils.normalizeSinkExpressions(
ruleOptimizedPlan, originalPlan);
if (normalizedPlan != null) {
plansWhichContainMv.add(normalizedPlan);
}
} catch (Exception e) {
LOG.error("pre mv rewrite in rbo rewrite fail, query id is {}",
cascadesContext.getConnectContext().getQueryIdentifier(), e);

} finally {
sessionVariable.nereidsTimeoutSecond = timeoutSecond;
sessionVariable.enableNereidsTimeout = enableTimeout;
}
}
// clear the rewritten plans which are tmp optimized, should be filled by full optimize later
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,6 @@ public class Rewriter extends AbstractBatchJobExecutor {
// so there may be two filters we need to merge them
new MergeFilters()
),
custom(RuleType.AGG_SCALAR_SUBQUERY_TO_WINDOW_FUNCTION,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why remove this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rule AGG_SCALAR_SUBQUERY_TO_WINDOW_FUNCTION generates window functions. Since the current code does not support window function rewriting, it would cause the transparent query rewrite using materialized views to fail. Therefore, this rule has been disabled in the rule package used for transparent query rewriting.

AggScalarSubQueryToWindowFunction::new),
bottomUp(
new EliminateUselessPlanUnderApply(),
// CorrelateApplyToUnCorrelateApply and ApplyToJoin
Expand All @@ -244,13 +242,7 @@ public class Rewriter extends AbstractBatchJobExecutor {
* we expected.
*/
new CorrelateApplyToUnCorrelateApply(),
new ApplyToJoin(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The NormalizeAggregaterule following ApplyToJoinhas been removed. This aligns with the usage of RBO rules in the main workflow, primarily for performance optimization. The NormalizeAggregateoperation will now be performed uniformly in a separate subsequent stage.

// UnCorrelatedApplyAggregateFilter rule will create new aggregate outputs,
// The later rule CheckPrivileges which inherent from ColumnPruning
// only works
// if the aggregation node is normalized, so we need call
// NormalizeAggregate here
new NormalizeAggregate()
new ApplyToJoin()
)
),
// before `Subquery unnesting` topic, some correlate slots should have appeared at
Expand Down Expand Up @@ -409,7 +401,6 @@ public class Rewriter extends AbstractBatchJobExecutor {
custom(RuleType.ADJUST_NULLABLE, () -> new AdjustNullable(false))
),
topic("add projection for join",
// this is for hint project join rewrite rule
custom(RuleType.ADD_PROJECT_FOR_JOIN, AddProjectForJoin::new),
topDown(new MergeProjectable())
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,21 +59,25 @@
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalUnion;
import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter;
import org.apache.doris.nereids.types.VariantType;
import org.apache.doris.nereids.util.ExpressionUtils;
import org.apache.doris.nereids.util.TypeUtils;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.statistics.Statistics;

import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -450,6 +454,10 @@ protected List<Plan> doRewrite(StructInfo queryStructInfo, CascadesContext casca
}
}
trySetStatistics(materializationContext, cascadesContext);
// Derive the operative column for materialized view scan
rewrittenPlan = deriveOperativeColumn(rewrittenPlan, queryStructInfo,
materializationContext.getShuttledExprToScanExprMapping(), viewToQuerySlotMapping,
materializationContext);
rewriteResults.add(rewrittenPlan);
recordIfRewritten(queryStructInfo.getOriginalPlan(), materializationContext, cascadesContext);
// If rewrite successfully, try to clear mv scan currently because it maybe used again
Expand Down Expand Up @@ -539,6 +547,51 @@ protected Plan rewriteQueryByView(MatchMode matchMode, StructInfo queryStructInf
return tempRewritedPlan;
}

/**
* Derive the operative column for materialized view scan, if the operative column in query can be
* represented by the operative column in materialized view, then set the operative column in
* materialized view scan, otherwise return the materialized view scan without operative column
*/
private static Plan deriveOperativeColumn(Plan rewrittenPlan, StructInfo queryStructInfo,
ExpressionMapping targetExpressionMapping, SlotMapping targetToSourceMapping,
MaterializationContext materializationContext) {
ExpressionMapping expressionMappingKeySourceBased = targetExpressionMapping.keyPermute(targetToSourceMapping);
// target to target replacement expression mapping, because mv is 1:1 so get first element
List<Map<Expression, Expression>> flattenExpressionMap = expressionMappingKeySourceBased.flattenMap();
Map<Expression, Expression> targetToTargetReplacementMappingQueryBased =
flattenExpressionMap.get(0);
final Multimap<NamedExpression, Slot> slotMapping = ArrayListMultimap.create();
for (Map.Entry<Expression, Expression> entry : targetToTargetReplacementMappingQueryBased.entrySet()) {
if (entry.getValue() instanceof Slot) {
entry.getKey().collect(NamedExpression.class::isInstance).forEach(
namedExpression -> slotMapping.put(
(NamedExpression) namedExpression, (Slot) entry.getValue()));
}
}
Set<Slot> operativeSlots = new HashSet<>();
for (CatalogRelation relation : queryStructInfo.getRelations()) {
List<Slot> relationOperativeSlots = relation.getOperativeSlots();
if (relationOperativeSlots.isEmpty()) {
continue;
}
for (Slot slot : relationOperativeSlots) {
Collection<Slot> mvOutputSlots = slotMapping.get(slot);
if (!mvOutputSlots.isEmpty()) {
operativeSlots.addAll(mvOutputSlots);
}
}
}
return rewrittenPlan.accept(new DefaultPlanRewriter<MaterializationContext>() {
@Override
public Plan visitLogicalOlapScan(LogicalOlapScan olapScan, MaterializationContext context) {
if (context.generateMaterializationIdentifier().equals(olapScan.getTable().getFullQualifiers())) {
return olapScan.withOperativeSlots(operativeSlots);
}
return super.visitLogicalOlapScan(olapScan, context);
}
}, materializationContext);
}

/**
* Use target expression to represent the source expression. Visit the source expression,
* try to replace the source expression with target expression in targetExpressionMapping, if found then
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,8 @@ private List<MaterializationContext> createSyncMvContexts(OlapTable olapTable,
continue;
}
ConnectContext basicMvContext = MTMVPlanUtil.createBasicMvContext(
cascadesContext.getConnectContext());
cascadesContext.getConnectContext(),
MTMVPlanUtil.DISABLE_RULES_WHEN_GENERATE_MTMV_CACHE);
basicMvContext.setDatabase(meta.getDbName());
MTMVCache mtmvCache = MTMVCache.from(querySql.get(),
basicMvContext, true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ protected Statistics normalizeStatisticsColumnExpression(Statistics originalPlan
if (sourceExpression != null && targetExpression instanceof NamedExpression
&& sourceExpression instanceof NamedExpression) {
normalizedExpressionMap.put(MaterializedViewUtils.normalizeExpression(
(NamedExpression) sourceExpression, (NamedExpression) targetExpression).toSlot(),
(NamedExpression) sourceExpression, (NamedExpression) targetExpression, false).toSlot(),
entry.getValue());
}
}
Expand Down
Loading
Loading