From daf8c0b6ba5199977a4838970ebdd652f51604f6 Mon Sep 17 00:00:00 2001 From: seawinde Date: Mon, 10 Nov 2025 11:09:56 +0800 Subject: [PATCH] [fix](mtmv) Fix rewrite fail when join conjuncts eliminate and group by key eliminate both happened (#55674) if mv def is as fllowing: CREATE MATERIALIZED VIEW mv1 BUILD IMMEDIATE REFRESH AUTO PROPERTIES ( "replication_num" = "1" ) as select a.cust_id, a.event_type_id, a.bind_attr, sum(sum_amount) sum_amount, date_format(a.created_date, '%Y%m') month, a.created_date from data_event a join customer b on(a.cust_id=b.cust_id) group by a.cust_id, a.event_type_id, a.bind_attr, month, a.created_date; if query is as fllowing, the query pattern, the query can be optimized by rule `ELIMINATE_CONST_JOIN_CONDITION` and `ELIMINATE_GROUP_BY_KEY_BY_UNIFORM` this would cause rewrite fail by mv, the pr fix this select a.cust_id, a.event_type_id, a.bind_attr, sum(sum_amount) sum_amount, date_format(created_date, '%Y%m') month from data_event a inner join customer b on (a.cust_id = b.cust_id) where a.cust_id = 410723002257 and date_format(a.created_date, '%Y%m') = '202509' group by a.cust_id, a.event_type_id, a.bind_attr, month; --- .../java/org/apache/doris/catalog/MTMV.java | 10 +- .../doris/job/extensions/mtmv/MTMVTask.java | 4 +- .../org/apache/doris/mtmv/MTMVPlanUtil.java | 39 ++++-- .../apache/doris/nereids/NereidsPlanner.java | 25 +++- .../doris/nereids/jobs/executor/Rewriter.java | 11 +- .../mv/AbstractMaterializedViewRule.java | 53 +++++++ .../mv/InitMaterializationContextHook.java | 3 +- .../mv/MaterializationContext.java | 2 +- .../exploration/mv/MaterializedViewUtils.java | 53 ++++++- .../mv/PreMaterializedViewRewriter.java | 30 ++-- .../agg_variety/join_conjuncts_eliminate.out | 11 ++ .../derive_mv_oprative.out | 15 ++ .../suites/mv_p0/count_star/count_star.groovy | 2 +- .../mv/agg_on_none_agg/agg_on_none_agg.groovy | 6 +- .../join_conjuncts_eliminate.groovy | 130 ++++++++++++++++++ .../derive_mv_oprative.groovy | 72 ++++++++++ .../pre_rewrite/limit/query_with_limit.groovy | 2 +- .../mv/tpch/mv_tpch_test.groovy | 3 +- 18 files changed, 407 insertions(+), 64 deletions(-) create mode 100644 regression-test/data/nereids_rules_p0/mv/agg_variety/join_conjuncts_eliminate.out create mode 100644 regression-test/data/nereids_rules_p0/mv/derive_mv_operative/derive_mv_oprative.out create mode 100644 regression-test/suites/nereids_rules_p0/mv/agg_variety/join_conjuncts_eliminate.groovy create mode 100644 regression-test/suites/nereids_rules_p0/mv/derive_mv_operative/derive_mv_oprative.groovy diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java index 1ccb93502c1df9..14571133e0d055 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java @@ -198,8 +198,9 @@ public void addTaskResult(MTMVTask task, MTMVRelation relation, if (!isReplay) { ConnectContext currentContext = ConnectContext.get(); // shouldn't do this while holding mvWriteLock - mtmvCache = MTMVCache.from(this.getQuerySql(), MTMVPlanUtil.createMTMVContext(this), true, - true, currentContext); + mtmvCache = MTMVCache.from(this.getQuerySql(), + MTMVPlanUtil.createMTMVContext(this, MTMVPlanUtil.DISABLE_RULES_WHEN_GENERATE_MTMV_CACHE), + true, true, currentContext); } } catch (Throwable e) { mtmvCache = null; @@ -342,8 +343,9 @@ public MTMVCache getOrGenerateCache(ConnectContext connectionContext) throws } // Concurrent situations may result in duplicate cache generation, // but we tolerate this in order to prevent nested use of readLock and write MvLock for the table - MTMVCache mtmvCache = MTMVCache.from(this.getQuerySql(), MTMVPlanUtil.createMTMVContext(this), true, - false, connectionContext); + MTMVCache mtmvCache = MTMVCache.from(this.getQuerySql(), + MTMVPlanUtil.createMTMVContext(this, MTMVPlanUtil.DISABLE_RULES_WHEN_GENERATE_MTMV_CACHE), + true, false, connectionContext); writeMvLock(); try { this.cache = mtmvCache; diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java index 861079ad8f59a9..a92754e8aee3ed 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java @@ -182,7 +182,7 @@ public void run() throws JobException { if (LOG.isDebugEnabled()) { LOG.debug("mtmv task run, taskId: {}", super.getTaskId()); } - ConnectContext ctx = MTMVPlanUtil.createMTMVContext(mtmv); + ConnectContext ctx = MTMVPlanUtil.createMTMVContext(mtmv, MTMVPlanUtil.DISABLE_RULES_WHEN_RUN_MTMV_TASK); try { if (LOG.isDebugEnabled()) { String taskSessionContext = ctx.getSessionVariable().toJson().toJSONString(); @@ -350,7 +350,7 @@ private void executeWithRetry(Set execPartitionNames, Map refreshPartitionNames, Map tableWithPartKey) throws Exception { - ConnectContext ctx = MTMVPlanUtil.createMTMVContext(mtmv); + ConnectContext ctx = MTMVPlanUtil.createMTMVContext(mtmv, MTMVPlanUtil.DISABLE_RULES_WHEN_RUN_MTMV_TASK); StatementContext statementContext = new StatementContext(); for (Entry entry : snapshots.entrySet()) { statementContext.setSnapshot(entry.getKey(), entry.getValue()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPlanUtil.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPlanUtil.java index bff8080a9c3703..c734e7497f9549 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPlanUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPlanUtil.java @@ -55,12 +55,12 @@ import org.apache.doris.nereids.util.TypeCoercionUtils; import org.apache.doris.qe.ConnectContext; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.apache.commons.collections.CollectionUtils; -import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Optional; @@ -70,8 +70,28 @@ public class MTMVPlanUtil { - public static ConnectContext createMTMVContext(MTMV mtmv) { - ConnectContext ctx = createBasicMvContext(null); + // The rules should be disabled when generate MTMV cache + // Because these rules may change the plan structure and cause the plan can not match the mv + // this is mainly for final CBO phase rewrite, pre RBO phase does not need to consider, because + // maintain tmp plan alone for rewrite when pre RBO rewrite + public static final List DISABLE_RULES_WHEN_GENERATE_MTMV_CACHE = ImmutableList.of( + RuleType.COMPRESSED_MATERIALIZE_AGG, + RuleType.COMPRESSED_MATERIALIZE_SORT, + RuleType.ELIMINATE_CONST_JOIN_CONDITION, + RuleType.CONSTANT_PROPAGATION, + RuleType.ADD_DEFAULT_LIMIT, + RuleType.ELIMINATE_JOIN_BY_FK, + RuleType.ELIMINATE_JOIN_BY_UK, + RuleType.ELIMINATE_GROUP_BY_KEY_BY_UNIFORM, + RuleType.ELIMINATE_GROUP_BY, + RuleType.SALT_JOIN, + RuleType.AGG_SCALAR_SUBQUERY_TO_WINDOW_FUNCTION + ); + // The rules should be disabled when run MTMV task + public static final List DISABLE_RULES_WHEN_RUN_MTMV_TASK = DISABLE_RULES_WHEN_GENERATE_MTMV_CACHE; + + public static ConnectContext createMTMVContext(MTMV mtmv, List disableRules) { + ConnectContext ctx = createBasicMvContext(null, disableRules); Optional workloadGroup = mtmv.getWorkloadGroup(); if (workloadGroup.isPresent()) { ctx.getSessionVariable().setWorkloadGroup(workloadGroup.get()); @@ -83,7 +103,8 @@ public static ConnectContext createMTMVContext(MTMV mtmv) { return ctx; } - public static ConnectContext createBasicMvContext(@Nullable ConnectContext parentContext) { + public static ConnectContext createBasicMvContext(@Nullable ConnectContext parentContext, + List disableRules) { ConnectContext ctx = new ConnectContext(); ctx.setEnv(Env.getCurrentEnv()); ctx.setCurrentUserIdentity(UserIdentity.ADMIN); @@ -97,16 +118,6 @@ public static ConnectContext createBasicMvContext(@Nullable ConnectContext paren ctx.getSessionVariable().skipStorageEngineMerge = false; ctx.getSessionVariable().showHiddenColumns = false; ctx.getSessionVariable().allowModifyMaterializedViewData = true; - // Rules disabled during materialized view plan generation. These rules can cause significant plan changes, - // which may affect transparent query rewriting by mv - List disableRules = Arrays.asList( - RuleType.COMPRESSED_MATERIALIZE_AGG, - RuleType.COMPRESSED_MATERIALIZE_SORT, - RuleType.ELIMINATE_CONST_JOIN_CONDITION, - RuleType.CONSTANT_PROPAGATION, - RuleType.ADD_DEFAULT_LIMIT, - RuleType.ELIMINATE_GROUP_BY - ); ctx.getSessionVariable().setDisableNereidsRules( disableRules.stream().map(RuleType::name).collect(Collectors.joining(","))); ctx.setStartTime(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java index f701ff0edfd4e4..28e7a477f9d91f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java @@ -454,13 +454,19 @@ protected void preMaterializedViewRewrite() { LOG.debug("Start pre rewrite plan by mv"); } List tmpPlansForMvRewrite = cascadesContext.getStatementContext().getTmpPlanForMvRewrite(); + Plan originalPlan = cascadesContext.getRewritePlan(); List plansWhichContainMv = new ArrayList<>(); + // because tmpPlansForMvRewrite only one, so timeout is cumulative which is ok for (Plan planForRewrite : tmpPlansForMvRewrite) { - if (!planForRewrite.getLogicalProperties().equals( - cascadesContext.getRewritePlan().getLogicalProperties())) { - continue; - } + SessionVariable sessionVariable = cascadesContext.getConnectContext() + .getSessionVariable(); + int timeoutSecond = sessionVariable.nereidsTimeoutSecond; + boolean enableTimeout = sessionVariable.enableNereidsTimeout; try { + // set mv rewrite timeout + sessionVariable.nereidsTimeoutSecond = PreMaterializedViewRewriter.convertMillisToCeilingSeconds( + sessionVariable.materializedViewRewriteDurationThresholdMs); + sessionVariable.enableNereidsTimeout = true; // pre rewrite Plan rewrittenPlan = MaterializedViewUtils.rewriteByRules(cascadesContext, PreMaterializedViewRewriter::rewrite, planForRewrite, planForRewrite, true); @@ -472,10 +478,19 @@ protected void preMaterializedViewRewrite() { if (ruleOptimizedPlan == null) { continue; } - plansWhichContainMv.add(ruleOptimizedPlan); + // after rbo, maybe the plan changed a lot, so we need to normalize it with original plan + Plan normalizedPlan = MaterializedViewUtils.normalizeSinkExpressions( + ruleOptimizedPlan, originalPlan); + if (normalizedPlan != null) { + plansWhichContainMv.add(normalizedPlan); + } } catch (Exception e) { LOG.error("pre mv rewrite in rbo rewrite fail, query id is {}", cascadesContext.getConnectContext().getQueryIdentifier(), e); + + } finally { + sessionVariable.nereidsTimeoutSecond = timeoutSecond; + sessionVariable.enableNereidsTimeout = enableTimeout; } } // clear the rewritten plans which are tmp optimized, should be filled by full optimize later diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java index 7eed68241f534e..22c6e4566a2bf6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java @@ -227,8 +227,6 @@ public class Rewriter extends AbstractBatchJobExecutor { // so there may be two filters we need to merge them new MergeFilters() ), - custom(RuleType.AGG_SCALAR_SUBQUERY_TO_WINDOW_FUNCTION, - AggScalarSubQueryToWindowFunction::new), bottomUp( new EliminateUselessPlanUnderApply(), // CorrelateApplyToUnCorrelateApply and ApplyToJoin @@ -243,13 +241,7 @@ public class Rewriter extends AbstractBatchJobExecutor { * we expected. */ new CorrelateApplyToUnCorrelateApply(), - new ApplyToJoin(), - // UnCorrelatedApplyAggregateFilter rule will create new aggregate outputs, - // The later rule CheckPrivileges which inherent from ColumnPruning - // only works - // if the aggregation node is normalized, so we need call - // NormalizeAggregate here - new NormalizeAggregate() + new ApplyToJoin() ) ), // before `Subquery unnesting` topic, some correlate slots should have appeared at @@ -408,7 +400,6 @@ public class Rewriter extends AbstractBatchJobExecutor { custom(RuleType.ADJUST_NULLABLE, () -> new AdjustNullable(false)) ), topic("add projection for join", - // this is for hint project join rewrite rule custom(RuleType.ADD_PROJECT_FOR_JOIN, AddProjectForJoin::new), topDown(new MergeProjectable()) ) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java index 4c9b71e81fad66..eab794e06ff95c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java @@ -57,21 +57,25 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalFilter; import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan; import org.apache.doris.nereids.trees.plans.logical.LogicalUnion; +import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter; import org.apache.doris.nereids.types.VariantType; import org.apache.doris.nereids.util.ExpressionUtils; import org.apache.doris.nereids.util.TypeUtils; import org.apache.doris.qe.SessionVariable; import org.apache.doris.statistics.Statistics; +import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; +import com.google.common.collect.Multimap; import com.google.common.collect.Sets; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.ArrayList; import java.util.BitSet; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -450,6 +454,10 @@ protected List doRewrite(StructInfo queryStructInfo, CascadesContext casca } } trySetStatistics(materializationContext, cascadesContext); + // Derive the operative column for materialized view scan + rewrittenPlan = deriveOperativeColumn(rewrittenPlan, queryStructInfo, + materializationContext.getShuttledExprToScanExprMapping(), viewToQuerySlotMapping, + materializationContext); rewriteResults.add(rewrittenPlan); recordIfRewritten(queryStructInfo.getOriginalPlan(), materializationContext, cascadesContext); // If rewrite successfully, try to clear mv scan currently because it maybe used again @@ -538,6 +546,51 @@ protected Plan rewriteQueryByView(MatchMode matchMode, StructInfo queryStructInf return tempRewritedPlan; } + /** + * Derive the operative column for materialized view scan, if the operative column in query can be + * represented by the operative column in materialized view, then set the operative column in + * materialized view scan, otherwise return the materialized view scan without operative column + */ + private static Plan deriveOperativeColumn(Plan rewrittenPlan, StructInfo queryStructInfo, + ExpressionMapping targetExpressionMapping, SlotMapping targetToSourceMapping, + MaterializationContext materializationContext) { + ExpressionMapping expressionMappingKeySourceBased = targetExpressionMapping.keyPermute(targetToSourceMapping); + // target to target replacement expression mapping, because mv is 1:1 so get first element + List> flattenExpressionMap = expressionMappingKeySourceBased.flattenMap(); + Map targetToTargetReplacementMappingQueryBased = + flattenExpressionMap.get(0); + final Multimap slotMapping = ArrayListMultimap.create(); + for (Map.Entry entry : targetToTargetReplacementMappingQueryBased.entrySet()) { + if (entry.getValue() instanceof Slot) { + entry.getKey().collect(NamedExpression.class::isInstance).forEach( + namedExpression -> slotMapping.put( + (NamedExpression) namedExpression, (Slot) entry.getValue())); + } + } + Set operativeSlots = new HashSet<>(); + for (CatalogRelation relation : queryStructInfo.getRelations()) { + List relationOperativeSlots = relation.getOperativeSlots(); + if (relationOperativeSlots.isEmpty()) { + continue; + } + for (Slot slot : relationOperativeSlots) { + Collection mvOutputSlots = slotMapping.get(slot); + if (!mvOutputSlots.isEmpty()) { + operativeSlots.addAll(mvOutputSlots); + } + } + } + return rewrittenPlan.accept(new DefaultPlanRewriter() { + @Override + public Plan visitLogicalOlapScan(LogicalOlapScan olapScan, MaterializationContext context) { + if (context.generateMaterializationIdentifier().equals(olapScan.getTable().getFullQualifiers())) { + return olapScan.withOperativeSlots(operativeSlots); + } + return super.visitLogicalOlapScan(olapScan, context); + } + }, materializationContext); + } + /** * Use target expression to represent the source expression. Visit the source expression, * try to replace the source expression with target expression in targetExpressionMapping, if found then diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/InitMaterializationContextHook.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/InitMaterializationContextHook.java index ee13628b4d141f..32bea175472549 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/InitMaterializationContextHook.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/InitMaterializationContextHook.java @@ -278,7 +278,8 @@ private List createSyncMvContexts(OlapTable olapTable, continue; } ConnectContext basicMvContext = MTMVPlanUtil.createBasicMvContext( - cascadesContext.getConnectContext()); + cascadesContext.getConnectContext(), + MTMVPlanUtil.DISABLE_RULES_WHEN_GENERATE_MTMV_CACHE); basicMvContext.setDatabase(meta.getDbName()); MTMVCache mtmvCache = MTMVCache.from(querySql.get(), basicMvContext, true, diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializationContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializationContext.java index 523cdfd90b8fa1..3386e4cd6030c5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializationContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializationContext.java @@ -276,7 +276,7 @@ protected Statistics normalizeStatisticsColumnExpression(Statistics originalPlan if (sourceExpression != null && targetExpression instanceof NamedExpression && sourceExpression instanceof NamedExpression) { normalizedExpressionMap.put(MaterializedViewUtils.normalizeExpression( - (NamedExpression) sourceExpression, (NamedExpression) targetExpression).toSlot(), + (NamedExpression) sourceExpression, (NamedExpression) targetExpression, false).toSlot(), entry.getValue()); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtils.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtils.java index 2f478a3e666f27..cb62eaf42f194a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtils.java @@ -65,10 +65,12 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalProject; import org.apache.doris.nereids.trees.plans.logical.LogicalRelation; import org.apache.doris.nereids.trees.plans.logical.LogicalResultSink; +import org.apache.doris.nereids.trees.plans.logical.LogicalSink; import org.apache.doris.nereids.trees.plans.logical.LogicalWindow; import org.apache.doris.nereids.trees.plans.physical.PhysicalCatalogRelation; import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan; import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation; +import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter; import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanVisitor; import org.apache.doris.nereids.trees.plans.visitor.NondeterministicFunctionCollector; import org.apache.doris.nereids.util.ExpressionUtils; @@ -350,7 +352,7 @@ public static Plan rewriteByRules( } /** - * Normalize expression such as nullable property and output slot id + * Normalize expression such as nullable property and output slot id when plan in the plan tree */ public static Plan normalizeExpressions(Plan rewrittenPlan, Plan originPlan) { if (rewrittenPlan.getOutput().size() != originPlan.getOutput().size()) { @@ -359,20 +361,63 @@ public static Plan normalizeExpressions(Plan rewrittenPlan, Plan originPlan) { // normalize nullable List normalizeProjects = new ArrayList<>(); for (int i = 0; i < originPlan.getOutput().size(); i++) { - normalizeProjects.add(normalizeExpression(originPlan.getOutput().get(i), rewrittenPlan.getOutput().get(i))); + normalizeProjects.add(normalizeExpression(originPlan.getOutput().get(i), + rewrittenPlan.getOutput().get(i), false)); } return new LogicalProject<>(normalizeProjects, rewrittenPlan); } + /** + * Normalize expression such as nullable property and output slot id when plan is on the top of tree + */ + public static Plan normalizeSinkExpressions(Plan rewrittenPlan, Plan originPlan) { + return rewrittenPlan.accept(new DefaultPlanRewriter() { + @Override + public Plan visitLogicalSink(LogicalSink rewrittenPlan, Void context) { + if (rewrittenPlan.getOutput().size() != originPlan.getOutput().size()) { + return null; + } + if (rewrittenPlan.getLogicalProperties().equals(originPlan.getLogicalProperties())) { + return rewrittenPlan; + } + List rewrittenPlanOutputExprList = rewrittenPlan.getOutputExprs(); + List originPlanOutputExprList = originPlan.getOutput(); + if (rewrittenPlanOutputExprList.size() != originPlanOutputExprList.size()) { + return null; + } + List normalizedOutputExprList = new ArrayList<>(); + for (int i = 0; i < rewrittenPlanOutputExprList.size(); i++) { + NamedExpression rewrittenExpression = rewrittenPlanOutputExprList.get(i); + NamedExpression originalExpression = originPlanOutputExprList.get(i); + normalizedOutputExprList.add(normalizeExpression(originalExpression, + rewrittenExpression, true)); + } + LogicalProject project = new LogicalProject<>(normalizedOutputExprList, + rewrittenPlan.child()); + return rewrittenPlan.withChildren(project); + } + }, null); + + } + /** * Normalize expression with query, keep the consistency of exprId and nullable props with * query * Keep the replacedExpression slot property is the same as the sourceExpression */ public static NamedExpression normalizeExpression( - NamedExpression sourceExpression, NamedExpression replacedExpression) { + NamedExpression sourceExpression, NamedExpression replacedExpression, boolean isSink) { Expression innerExpression = replacedExpression; - if (replacedExpression.nullable() != sourceExpression.nullable()) { + boolean isExprEquals = replacedExpression.getExprId().equals(sourceExpression.getExprId()); + boolean isNullableEquals = replacedExpression.nullable() == sourceExpression.nullable(); + if (isExprEquals && isNullableEquals) { + return replacedExpression; + } + if (isExprEquals && isSink && replacedExpression instanceof SlotReference) { + // for sink, if expr id is the same, but nullable is different, should keep the same expr id + return ((SlotReference) replacedExpression).withNullable(sourceExpression.nullable()); + } + if (!isNullableEquals) { // if enable join eliminate, query maybe inner join and mv maybe outer join. // If the slot is at null generate side, the nullable maybe different between query and view // So need to force to consistent. diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/PreMaterializedViewRewriter.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/PreMaterializedViewRewriter.java index d6b47cabd63f86..2e645e4abc0795 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/PreMaterializedViewRewriter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/PreMaterializedViewRewriter.java @@ -70,6 +70,8 @@ public class PreMaterializedViewRewriter { NEED_PRE_REWRITE_RULE_TYPES.set(RuleType.DISTINCT_AGGREGATE_SPLIT.ordinal()); NEED_PRE_REWRITE_RULE_TYPES.set(RuleType.PROCESS_SCALAR_AGG_MUST_USE_MULTI_DISTINCT.ordinal()); NEED_PRE_REWRITE_RULE_TYPES.set(RuleType.ELIMINATE_GROUP_BY_KEY_BY_UNIFORM.ordinal()); + NEED_PRE_REWRITE_RULE_TYPES.set(RuleType.SALT_JOIN.ordinal()); + NEED_PRE_REWRITE_RULE_TYPES.set(RuleType.AGG_SCALAR_SUBQUERY_TO_WINDOW_FUNCTION.ordinal()); } /** @@ -155,22 +157,6 @@ public static boolean needPreRewrite(CascadesContext cascadesContext) { } return false; } - boolean outputAnyEquals = false; - Plan finalRewritePlan = cascadesContext.getRewritePlan(); - for (Plan tmpPlanForRewrite : statementContext.getTmpPlanForMvRewrite()) { - if (finalRewritePlan.getLogicalProperties().equals(tmpPlanForRewrite.getLogicalProperties())) { - outputAnyEquals = true; - break; - } - } - if (!outputAnyEquals) { - // if tmp plan has no same logical properties to the finalRewritePlan, should not be written in rbo - if (LOG.isDebugEnabled()) { - LOG.debug("does not need pre rewrite, because outputAnyEquals is false, query id is {}", - cascadesContext.getConnectContext().getQueryIdentifier()); - } - return false; - } if (Optimizer.isDpHyp(cascadesContext)) { // dp hyper only support one group expression in each group when init if (LOG.isDebugEnabled()) { @@ -195,6 +181,18 @@ public static boolean needPreRewrite(CascadesContext cascadesContext) { return shouldPreRewrite; } + /** + * convert millis to ceiling seconds + */ + public static int convertMillisToCeilingSeconds(long milliseconds) { + if (milliseconds <= 0) { + return 0; + } + double secondsAsDouble = (double) milliseconds / 1000.0; + double ceilingSeconds = Math.ceil(secondsAsDouble); + return (int) ceilingSeconds; + } + /** * PreRewriteStrategy from materialized view rewrite */ diff --git a/regression-test/data/nereids_rules_p0/mv/agg_variety/join_conjuncts_eliminate.out b/regression-test/data/nereids_rules_p0/mv/agg_variety/join_conjuncts_eliminate.out new file mode 100644 index 00000000000000..3558a15e3ad118 --- /dev/null +++ b/regression-test/data/nereids_rules_p0/mv/agg_variety/join_conjuncts_eliminate.out @@ -0,0 +1,11 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !query1_1_before -- +410723002257 1 attr_A 300.00 202509 +410723002257 2 attr_B 400.00 202509 +410723002257 3 attr_C 300.00 202509 + +-- !query1_1_after -- +410723002257 1 attr_A 300.00 202509 +410723002257 2 attr_B 400.00 202509 +410723002257 3 attr_C 300.00 202509 + diff --git a/regression-test/data/nereids_rules_p0/mv/derive_mv_operative/derive_mv_oprative.out b/regression-test/data/nereids_rules_p0/mv/derive_mv_operative/derive_mv_oprative.out new file mode 100644 index 00000000000000..d0c64c31f8e01b --- /dev/null +++ b/regression-test/data/nereids_rules_p0/mv/derive_mv_operative/derive_mv_oprative.out @@ -0,0 +1,15 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !query1_0_before -- +a 12 +b 15 +c 19 +d 11 +e 18 + +-- !query1_0_after -- +a 12 +b 15 +c 19 +d 11 +e 18 + diff --git a/regression-test/suites/mv_p0/count_star/count_star.groovy b/regression-test/suites/mv_p0/count_star/count_star.groovy index 9fffb56ee81053..9c5e99afce1048 100644 --- a/regression-test/suites/mv_p0/count_star/count_star.groovy +++ b/regression-test/suites/mv_p0/count_star/count_star.groovy @@ -56,7 +56,7 @@ suite ("count_star") { mv_rewrite_success("select k1,k4,count(*) from d_table group by k1,k4;", "kstar") qt_select_mv "select k1,k4,count(*) from d_table group by k1,k4 order by 1,2;" - mv_rewrite_success("select k1,k4,count(*) from d_table where k1=1 group by k1,k4;", "kstar") + mv_rewrite_success_without_check_chosen("select k1,k4,count(*) from d_table where k1=1 group by k1,k4;", "kstar") qt_select_mv "select k1,k4,count(*) from d_table where k1=1 group by k1,k4 order by 1,2;" mv_rewrite_fail("select k1,k4,count(*) from d_table where k3=1 group by k1,k4;", "kstar") diff --git a/regression-test/suites/nereids_rules_p0/mv/agg_on_none_agg/agg_on_none_agg.groovy b/regression-test/suites/nereids_rules_p0/mv/agg_on_none_agg/agg_on_none_agg.groovy index a4c17707179043..ec3e2c6e6f31da 100644 --- a/regression-test/suites/nereids_rules_p0/mv/agg_on_none_agg/agg_on_none_agg.groovy +++ b/regression-test/suites/nereids_rules_p0/mv/agg_on_none_agg/agg_on_none_agg.groovy @@ -279,7 +279,7 @@ suite("agg_on_none_agg") { bin(o_orderkey); """ order_qt_query3_0_before "${query3_0}" - async_mv_rewrite_success(db, mv3_0, query3_0, "mv3_0") + async_mv_rewrite_success_without_check_chosen(db, mv3_0, query3_0, "mv3_0") order_qt_query3_0_after "${query3_0}" sql """ DROP MATERIALIZED VIEW IF EXISTS mv3_0""" @@ -382,7 +382,7 @@ suite("agg_on_none_agg") { bin(o_orderkey); """ order_qt_query4_0_before "${query4_0}" - async_mv_rewrite_success(db, mv4_0, query4_0, "mv4_0") + async_mv_rewrite_success_without_check_chosen(db, mv4_0, query4_0, "mv4_0") order_qt_query4_0_after "${query4_0}" sql """ DROP MATERIALIZED VIEW IF EXISTS mv4_0""" @@ -411,7 +411,7 @@ suite("agg_on_none_agg") { l_partkey; """ order_qt_query4_1_before "${query4_1}" - async_mv_rewrite_success(db, mv4_1, query4_1, "mv4_1") + async_mv_rewrite_success_without_check_chosen(db, mv4_1, query4_1, "mv4_1") order_qt_query4_1_after "${query4_1}" sql """ DROP MATERIALIZED VIEW IF EXISTS mv4_1""" diff --git a/regression-test/suites/nereids_rules_p0/mv/agg_variety/join_conjuncts_eliminate.groovy b/regression-test/suites/nereids_rules_p0/mv/agg_variety/join_conjuncts_eliminate.groovy new file mode 100644 index 00000000000000..82cac1a2e0ed2b --- /dev/null +++ b/regression-test/suites/nereids_rules_p0/mv/agg_variety/join_conjuncts_eliminate.groovy @@ -0,0 +1,130 @@ +package mv.agg_variety +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("join_conjuncts_eliminate") { + String db = context.config.getDbNameByFile(context.file) + sql "use ${db}" + sql "set runtime_filter_mode=OFF"; + sql "SET ignore_shape_nodes='PhysicalDistribute,PhysicalProject'" + sql "set pre_materialized_view_rewrite_strategy = TRY_IN_RBO" + + sql """ + drop table if exists customer + """ + + sql """ + CREATE TABLE customer ( + `cust_id` BIGINT NOT NULL COMMENT '', + `cust_name` VARCHAR(100) NULL COMMENT '', + `create_time` DATETIME NULL COMMENT '' + ) ENGINE=OLAP + DUPLICATE KEY(`cust_id`) + DISTRIBUTED BY HASH(`cust_id`) BUCKETS 10 + PROPERTIES ( + "replication_num" = "1" + ); + """ + + + sql """ + drop table if exists data_event + """ + + sql""" + CREATE TABLE data_event ( + `event_id` BIGINT NOT NULL COMMENT '', + `cust_id` BIGINT NULL COMMENT '', + `event_type_id` INT NULL COMMENT '', + `bind_attr` VARCHAR(100) NULL COMMENT '', + `sum_amount` DECIMAL(16,2) NULL COMMENT '', + `created_date` DATETIME NULL COMMENT '', + `create_time` DATETIME NULL COMMENT '' + ) ENGINE=OLAP + DUPLICATE KEY(`event_id`) + DISTRIBUTED BY HASH(`event_id`) BUCKETS 10 + PROPERTIES ( + "replication_num" = "1" + ); + """ + + sql """ + INSERT INTO customer (cust_id, cust_name, create_time) VALUES + (410723002257, '测试客户1', '2025-01-01 00:00:00'), + (410723002258, '测试客户2', '2025-01-01 00:00:00'); + + INSERT INTO data_event (event_id, cust_id, event_type_id, bind_attr, sum_amount, created_date, create_time) VALUES + (1, 410723002257, 1, 'attr_A', 100.00, '2025-09-01 10:00:00', '2025-09-01 10:00:00'), + (2, 410723002257, 1, 'attr_A', 200.00, '2025-09-02 11:00:00', '2025-09-02 11:00:00'), + (3, 410723002257, 2, 'attr_B', 150.00, '2025-09-03 12:00:00', '2025-09-03 12:00:00'), + (4, 410723002257, 2, 'attr_B', 250.00, '2025-09-04 13:00:00', '2025-09-04 13:00:00'), + (5, 410723002257, 3, 'attr_C', 300.00, '2025-09-05 14:00:00', '2025-09-05 14:00:00'), + (6, 410723002258, 1, 'attr_A', 50.00, '2025-09-01 15:00:00', '2025-09-01 15:00:00'), + (7, 410723002258, 2, 'attr_B', 75.00, '2025-09-02 16:00:00', '2025-09-02 16:00:00'), + (8, 410723002257, 1, 'attr_A', 400.00, '2025-08-01 10:00:00', '2025-08-01 10:00:00'), + (9, 410723002257, 2, 'attr_B', 500.00, '2025-10-01 10:00:00', '2025-10-01 10:00:00'); + """ + + sql "ANALYZE TABLE customer WITH SYNC" + sql "ANALYZE TABLE data_event WITH SYNC" + + def query_sql = """ + select + a.cust_id, + a.event_type_id, + a.bind_attr, + sum(sum_amount) sum_amount, + date_format(created_date, '%Y%m') month + from data_event a + inner join customer b on (a.cust_id = b.cust_id) + where a.cust_id = 410723002257 + and date_format(a.created_date, '%Y%m') = '202509' + group by a.cust_id, a.event_type_id, a.bind_attr, month; + """ + + create_async_mv(db, "mv1", """select + a.cust_id, + a.event_type_id, + a.bind_attr, + sum(sum_amount) sum_amount, + date_format(a.created_date, '%Y%m') month, + a.created_date + from data_event a + join customer b on(a.cust_id=b.cust_id) + group by a.cust_id, a.event_type_id, a.bind_attr, month, a.created_date; + """) + + create_async_mv(db, "mv2", """ + select + a.cust_id, + a.event_type_id, + a.bind_attr, + sum(sum_amount) sum_amount, + date_format(a.created_date, '%Y%m') month + from data_event a + join customer b on(a.cust_id=b.cust_id) + group by a.cust_id, a.event_type_id, a.bind_attr, month; + """) + + + order_qt_query1_1_before "${query_sql}" + mv_rewrite_all_success_without_check_chosen(query_sql, ["mv1", "mv2"]) + order_qt_query1_1_after "${query_sql}" + + sql "DROP MATERIALIZED VIEW IF EXISTS mv1" + sql "DROP MATERIALIZED VIEW IF EXISTS mv2" +} diff --git a/regression-test/suites/nereids_rules_p0/mv/derive_mv_operative/derive_mv_oprative.groovy b/regression-test/suites/nereids_rules_p0/mv/derive_mv_operative/derive_mv_oprative.groovy new file mode 100644 index 00000000000000..efd00e6cc2be4a --- /dev/null +++ b/regression-test/suites/nereids_rules_p0/mv/derive_mv_operative/derive_mv_oprative.groovy @@ -0,0 +1,72 @@ +package mv.derive_mv_operative +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("derive_mv_oprative") { + String db = context.config.getDbNameByFile(context.file) + sql "use ${db}" + sql "set runtime_filter_mode=OFF"; + sql "SET ignore_shape_nodes='PhysicalDistribute,PhysicalProject'" + sql "set pre_materialized_view_rewrite_strategy = NOT_IN_RBO" + + sql """ + drop table if exists part + """ + + sql """ + CREATE TABLE `part` ( + `p_partkey` INT(11) NOT NULL COMMENT '', + `p_name` VARCHAR(255) NOT NULL COMMENT '', + `p_size` INT(11) NOT NULL COMMENT '' + ) + ENGINE=OLAP + DUPLICATE KEY(`p_partkey`) + COMMENT '部件信息表' + DISTRIBUTED BY HASH(`p_partkey`) BUCKETS 10 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + + sql """ + INSERT INTO `part` (p_partkey, p_name, p_size) + VALUES + (1, 'a', 12), + (2, 'b', 15), + (3, 'c', 19), + (4, 'd', 11), + (5, 'e', 18), + (6, 'f', 5), + (7, 'g', 25), + (8, 'h', 16), + (9, 'i', 10), + (10, 'j', 14); + """ + + sql """alter table part modify column p_name set stats ('row_count'='10');""" + + def mv1_0 = """ + SELECT p_name, p_size FROM part WHERE p_size > 10 AND p_size < 20 ORDER BY p_name; + """ + def query1_0 = """ + SELECT p_name, p_size FROM part WHERE p_size > 10 AND p_size < 20 ORDER BY p_name limit 5; + """ + order_qt_query1_0_before "${query1_0}" + async_mv_rewrite_success(db, mv1_0, query1_0, "operative_mv") + order_qt_query1_0_after "${query1_0}" + sql """ DROP MATERIALIZED VIEW IF EXISTS operative_mv""" +} diff --git a/regression-test/suites/nereids_rules_p0/mv/pre_rewrite/limit/query_with_limit.groovy b/regression-test/suites/nereids_rules_p0/mv/pre_rewrite/limit/query_with_limit.groovy index f3db2c06aa9f2c..1f4b5f383f5f21 100644 --- a/regression-test/suites/nereids_rules_p0/mv/pre_rewrite/limit/query_with_limit.groovy +++ b/regression-test/suites/nereids_rules_p0/mv/pre_rewrite/limit/query_with_limit.groovy @@ -371,7 +371,7 @@ suite("query_with_limit") { l_suppkey limit 2; """ - async_mv_rewrite_success(db, mv1_5_0, query1_5_0, "mv1_5_0", [TRY_IN_RBO, FORCE_IN_RBO]) + async_mv_rewrite_success_without_check_chosen(db, mv1_5_0, query1_5_0, "mv1_5_0", [TRY_IN_RBO, FORCE_IN_RBO]) async_mv_rewrite_fail(db, mv1_5_0, query1_5_0, "mv1_5_0", [NOT_IN_RBO]) sql """ DROP MATERIALIZED VIEW IF EXISTS mv1_5_0""" diff --git a/regression-test/suites/nereids_rules_p0/mv/tpch/mv_tpch_test.groovy b/regression-test/suites/nereids_rules_p0/mv/tpch/mv_tpch_test.groovy index ffb2bd5a6d91c8..d399207fa56067 100644 --- a/regression-test/suites/nereids_rules_p0/mv/tpch/mv_tpch_test.groovy +++ b/regression-test/suites/nereids_rules_p0/mv/tpch/mv_tpch_test.groovy @@ -1232,8 +1232,7 @@ suite("mv_tpch_test") { ) """ // contains subquery, doesn't support now - order_qt_query17_before "${query17}" - async_mv_rewrite_fail(db, mv17, query17, "mv17") + async_mv_rewrite_success(db, mv17, query17, "mv17") order_qt_query17_after "${query17}" sql """ DROP MATERIALIZED VIEW IF EXISTS mv17"""