From d5dbe1f0c4cfd235bfd73efa6b65fe8bdda0370c Mon Sep 17 00:00:00 2001 From: seawinde Date: Tue, 19 Nov 2024 16:13:40 +0800 Subject: [PATCH] [fix](mtmv) Fix get mv read lock too late when rewritten by materialized view (#44164) Problem Summary: When materialized view is rewritten, it would use the mv metadata. Should try to get read lock before use these metadata. or it would cause error. Such as mv def is as following CREATE MATERIALIZED VIEW mv1 BUILD IMMEDIATE REFRESH COMPLETE ON MANUAL DISTRIBUTED BY RANDOM BUCKETS 2 PROPERTIES ('replication_num' = '1') AS select o_orderdate, o_shippriority, o_comment, o.o_code as o_o_code, l_orderkey, l_partkey, l.o_code as l_o_code from orders_same_col o left join lineitem_same_col l on l_orderkey = o_orderkey left join partsupp on ps_partkey = l_partkey and l_suppkey = ps_suppkey; When handling transparent rewriting, a MV scan plan is used for the transparent rewrite. During the initialization of the scan plan, the partitions of the table are retrieved, so it is necessary to attempt to acquire a read lock on the table during initialization. If the read lock is not acquired, subsequent operations may add or delete partitions, and in the later processing of table partitions, calling get Partition may not retrieve the corresponding partition, leading to data errors. --- .../mv/AbstractMaterializedViewRule.java | 12 ++--- .../mv/AsyncMaterializationContext.java | 14 ++++-- .../mv/MaterializationContext.java | 48 +++++++++--------- .../mv/SyncMaterializationContext.java | 25 ++++++---- .../doris/nereids/mv/IdStatisticsMapTest.java | 2 +- .../mv/join/left_outer/outer_join.out | 46 +++++++++++++++++ .../mv/join/left_outer/outer_join.groovy | 49 +++++++++++++++++++ 7 files changed, 150 insertions(+), 46 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java index 60b5c58d4c50df..8e9ef1eaa97b7a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java @@ -234,7 +234,7 @@ protected List doRewrite(StructInfo queryStructInfo, CascadesContext casca continue; } Plan rewrittenPlan; - Plan mvScan = materializationContext.getScanPlan(queryStructInfo); + Plan mvScan = materializationContext.getScanPlan(queryStructInfo, cascadesContext); Plan queryPlan = queryStructInfo.getTopPlan(); if (compensatePredicates.isAlwaysTrue()) { rewrittenPlan = mvScan; @@ -262,12 +262,6 @@ protected List doRewrite(StructInfo queryStructInfo, CascadesContext casca // Rewrite query by view rewrittenPlan = rewriteQueryByView(matchMode, queryStructInfo, viewStructInfo, viewToQuerySlotMapping, rewrittenPlan, materializationContext, cascadesContext); - // If rewrite successfully, try to get mv read lock to avoid data inconsistent, - // try to get lock which should added before RBO - if (materializationContext instanceof AsyncMaterializationContext && !materializationContext.isSuccess()) { - cascadesContext.getStatementContext() - .addTableReadLock(((AsyncMaterializationContext) materializationContext).getMtmv()); - } rewrittenPlan = MaterializedViewUtils.rewriteByRules(cascadesContext, childContext -> { Rewriter.getWholeTreeRewriter(childContext).execute(); @@ -379,9 +373,9 @@ protected List doRewrite(StructInfo queryStructInfo, CascadesContext casca } trySetStatistics(materializationContext, cascadesContext); rewriteResults.add(rewrittenPlan); - // if rewrite successfully, try to regenerate mv scan because it maybe used again - materializationContext.tryReGenerateScanPlan(cascadesContext); recordIfRewritten(queryStructInfo.getOriginalPlan(), materializationContext, cascadesContext); + // If rewrite successfully, try to clear mv scan currently because it maybe used again + materializationContext.clearScanPlan(cascadesContext); } return rewriteResults; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AsyncMaterializationContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AsyncMaterializationContext.java index 0d88672fed64de..96d37ad546a7b4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AsyncMaterializationContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AsyncMaterializationContext.java @@ -57,9 +57,7 @@ public class AsyncMaterializationContext extends MaterializationContext { */ public AsyncMaterializationContext(MTMV mtmv, Plan mvPlan, Plan mvOriginalPlan, List baseTables, List
baseViews, CascadesContext cascadesContext, StructInfo structInfo) { - super(mvPlan, mvOriginalPlan, MaterializedViewUtils.generateMvScanPlan(mtmv, mtmv.getBaseIndexId(), - mtmv.getPartitionIds(), PreAggStatus.on(), cascadesContext), - cascadesContext, structInfo); + super(mvPlan, mvOriginalPlan, cascadesContext, structInfo); this.mtmv = mtmv; } @@ -110,7 +108,7 @@ public Optional> getPlanStatistics(CascadesContext cascades return Optional.empty(); } RelationId relationId = null; - Optional logicalOlapScan = this.getScanPlan(null) + Optional logicalOlapScan = this.getScanPlan(null, cascadesContext) .collectFirst(LogicalOlapScan.class::isInstance); if (logicalOlapScan.isPresent()) { relationId = logicalOlapScan.get().getRelationId(); @@ -132,7 +130,13 @@ boolean isFinalChosen(Relation relation) { } @Override - public Plan getScanPlan(StructInfo queryInfo) { + public Plan getScanPlan(StructInfo queryInfo, CascadesContext cascadesContext) { + // If try to get scan plan or rewrite successfully, try to get mv read lock to avoid meta data inconsistent, + // try to get lock which should added before RBO + if (!this.isSuccess()) { + cascadesContext.getStatementContext().addTableReadLock(this.getMtmv()); + } + super.getScanPlan(queryInfo, cascadesContext); return scanPlan; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializationContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializationContext.java index df535d59d87399..38eba2ac3406ff 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializationContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializationContext.java @@ -105,22 +105,13 @@ public abstract class MaterializationContext { /** * MaterializationContext, this contains necessary info for query rewriting by materialization */ - public MaterializationContext(Plan plan, Plan originalPlan, Plan scanPlan, + public MaterializationContext(Plan plan, Plan originalPlan, CascadesContext cascadesContext, StructInfo structInfo) { this.plan = plan; this.originalPlan = originalPlan; - this.scanPlan = scanPlan; - StatementBase parsedStatement = cascadesContext.getStatementContext().getParsedStatement(); this.enableRecordFailureDetail = parsedStatement != null && parsedStatement.isExplain() && ExplainLevel.MEMO_PLAN == parsedStatement.getExplainOptions().getExplainLevel(); - List originalPlanOutput = originalPlan.getOutput(); - List scanPlanOutput = this.scanPlan.getOutput(); - if (originalPlanOutput.size() == scanPlanOutput.size()) { - for (int slotIndex = 0; slotIndex < originalPlanOutput.size(); slotIndex++) { - this.exprToScanExprMapping.put(originalPlanOutput.get(slotIndex), scanPlanOutput.get(slotIndex)); - } - } // Construct materialization struct info, catch exception which may cause planner roll back this.structInfo = structInfo == null ? constructStructInfo(plan, originalPlan, cascadesContext, new BitSet()).orElseGet(() -> null) @@ -128,10 +119,6 @@ public MaterializationContext(Plan plan, Plan originalPlan, Plan scanPlan, this.available = this.structInfo != null; if (available) { this.planOutputShuttledExpressions = this.structInfo.getPlanOutputShuttledExpressions(); - // materialization output expression shuttle, this will be used to expression rewrite - this.shuttledExprToScanExprMapping = ExpressionMapping.generate( - this.planOutputShuttledExpressions, - scanPlanOutput); } } @@ -176,17 +163,19 @@ public void addMatchedGroup(GroupId groupId, boolean rewriteSuccess) { * if MaterializationContext is already rewritten successfully, then should generate new scan plan in later * query rewrite, because one plan may hit the materialized view repeatedly and the materialization scan output * should be different. - * This method should be called when query rewrite successfully */ - public void tryReGenerateScanPlan(CascadesContext cascadesContext) { + public void tryGenerateScanPlan(CascadesContext cascadesContext) { + if (!this.isAvailable()) { + return; + } this.scanPlan = doGenerateScanPlan(cascadesContext); - // materialization output expression shuttle, this will be used to expression rewrite - this.shuttledExprToScanExprMapping = ExpressionMapping.generate( - this.planOutputShuttledExpressions, - this.scanPlan.getOutput()); + // Materialization output expression shuttle, this will be used to expression rewrite + List scanPlanOutput = this.scanPlan.getOutput(); + this.shuttledExprToScanExprMapping = ExpressionMapping.generate(this.planOutputShuttledExpressions, + scanPlanOutput); + // This is used by normalize statistics column expression Map regeneratedMapping = new HashMap<>(); List originalPlanOutput = originalPlan.getOutput(); - List scanPlanOutput = this.scanPlan.getOutput(); if (originalPlanOutput.size() == scanPlanOutput.size()) { for (int slotIndex = 0; slotIndex < originalPlanOutput.size(); slotIndex++) { regeneratedMapping.put(originalPlanOutput.get(slotIndex), scanPlanOutput.get(slotIndex)); @@ -195,6 +184,17 @@ public void tryReGenerateScanPlan(CascadesContext cascadesContext) { this.exprToScanExprMapping = regeneratedMapping; } + /** + * Should clear scan plan after materializationContext is already rewritten successfully, + * Because one plan may hit the materialized view repeatedly and the materialization scan output + * should be different. + */ + public void clearScanPlan(CascadesContext cascadesContext) { + this.scanPlan = null; + this.shuttledExprToScanExprMapping = null; + this.exprToScanExprMapping = null; + } + public void addSlotMappingToCache(RelationMapping relationMapping, SlotMapping slotMapping) { queryToMaterializationSlotMappingCache.put(relationMapping, slotMapping); } @@ -275,7 +275,11 @@ public Plan getOriginalPlan() { return originalPlan; } - public Plan getScanPlan(StructInfo queryStructInfo) { + public Plan getScanPlan(StructInfo queryStructInfo, CascadesContext cascadesContext) { + if (this.scanPlan == null || this.shuttledExprToScanExprMapping == null + || this.exprToScanExprMapping == null) { + tryGenerateScanPlan(cascadesContext); + } return scanPlan; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/SyncMaterializationContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/SyncMaterializationContext.java index 47b01385ac1646..e27b3d5174391f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/SyncMaterializationContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/SyncMaterializationContext.java @@ -25,6 +25,7 @@ import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.PreAggStatus; import org.apache.doris.nereids.trees.plans.RelationId; +import org.apache.doris.nereids.trees.plans.algebra.CatalogRelation; import org.apache.doris.nereids.trees.plans.algebra.Relation; import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan; import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan; @@ -55,9 +56,7 @@ public class SyncMaterializationContext extends MaterializationContext { */ public SyncMaterializationContext(Plan mvPlan, Plan mvOriginalPlan, OlapTable olapTable, long indexId, String indexName, CascadesContext cascadesContext, Statistics statistics) { - super(mvPlan, mvOriginalPlan, - MaterializedViewUtils.generateMvScanPlan(olapTable, indexId, olapTable.getPartitionIds(), - PreAggStatus.unset(), cascadesContext), cascadesContext, null); + super(mvPlan, mvOriginalPlan, cascadesContext, null); this.olapTable = olapTable; this.indexId = indexId; this.indexName = indexName; @@ -100,7 +99,7 @@ String getStringInfo() { @Override Optional> getPlanStatistics(CascadesContext cascadesContext) { RelationId relationId = null; - Optional scanObj = this.getScanPlan(null) + Optional scanObj = this.getScanPlan(null, cascadesContext) .collectFirst(LogicalOlapScan.class::isInstance); if (scanObj.isPresent()) { relationId = scanObj.get().getRelationId(); @@ -109,19 +108,27 @@ Optional> getPlanStatistics(CascadesContext cascadesContext } @Override - public Plan getScanPlan(StructInfo queryStructInfo) { + public Plan getScanPlan(StructInfo queryStructInfo, CascadesContext cascadesContext) { + // Already get lock if sync mv, doesn't need to get lock + super.getScanPlan(queryStructInfo, cascadesContext); if (queryStructInfo == null) { return scanPlan; } - if (queryStructInfo.getRelations().size() == 1 - && queryStructInfo.getRelations().get(0) instanceof LogicalOlapScan - && !((LogicalOlapScan) queryStructInfo.getRelations().get(0)).getSelectedPartitionIds().isEmpty()) { + List queryStructInfoRelations = queryStructInfo.getRelations(); + if (queryStructInfoRelations.size() == 1 + && queryStructInfoRelations.get(0) instanceof LogicalOlapScan + && !((LogicalOlapScan) queryStructInfoRelations.get(0)).getSelectedPartitionIds().isEmpty()) { // Partition prune if sync materialized view return scanPlan.accept(new DefaultPlanRewriter() { @Override public Plan visitLogicalOlapScan(LogicalOlapScan olapScan, Void context) { + if (!queryStructInfoRelations.get(0).getTable().getFullQualifiers().equals( + olapScan.getTable().getFullQualifiers())) { + // Only the same table, we can do partition prue + return olapScan; + } return olapScan.withSelectedPartitionIds( - ((LogicalOlapScan) queryStructInfo.getRelations().get(0)).getSelectedPartitionIds()); + ((LogicalOlapScan) queryStructInfoRelations.get(0)).getSelectedPartitionIds()); } }, null); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/mv/IdStatisticsMapTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/mv/IdStatisticsMapTest.java index a4c05fa81e6d0c..0090982db00898 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/mv/IdStatisticsMapTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/mv/IdStatisticsMapTest.java @@ -76,7 +76,7 @@ public boolean isMVPartitionValid(MTMV mtmv, ConnectContext ctx, boolean isMVPar .rewrite(); // scan plan output will be refreshed after mv rewrite successfully, so need tmp store Set materializationScanOutput = c1.getMaterializationContexts().get(0) - .getScanPlan(null).getOutputSet(); + .getScanPlan(null, c1).getOutputSet(); tmpPlanChecker .optimize() .printlnBestPlanTree(); diff --git a/regression-test/data/nereids_rules_p0/mv/join/left_outer/outer_join.out b/regression-test/data/nereids_rules_p0/mv/join/left_outer/outer_join.out index 1a1b846054bbce..b8e78048d8e9ff 100644 --- a/regression-test/data/nereids_rules_p0/mv/join/left_outer/outer_join.out +++ b/regression-test/data/nereids_rules_p0/mv/join/left_outer/outer_join.out @@ -373,3 +373,49 @@ 2023-12-12 2 mi 108 2 2023-12-12 2 mi 108 2 +-- !query12_0_before -- +2023-12-09 1 yy 95 4 +2023-12-09 1 yy 95 4 +2023-12-09 1 yy 96 4 +2023-12-09 1 yy 96 4 +2023-12-09 1 yy 97 4 +2023-12-09 1 yy 97 4 +2023-12-10 1 yy 100 2 +2023-12-10 1 yy 101 2 +2023-12-10 1 yy 98 2 +2023-12-10 1 yy 99 2 +2023-12-11 2 mm 102 3 +2023-12-11 2 mm 103 3 +2023-12-11 2 mm 104 3 +2023-12-12 2 mi 105 2 +2023-12-12 2 mi 105 2 +2023-12-12 2 mi 106 2 +2023-12-12 2 mi 106 2 +2023-12-12 2 mi 107 2 +2023-12-12 2 mi 107 2 +2023-12-12 2 mi 108 2 +2023-12-12 2 mi 108 2 + +-- !query12_0_after -- +2023-12-09 1 yy 95 4 +2023-12-09 1 yy 95 4 +2023-12-09 1 yy 96 4 +2023-12-09 1 yy 96 4 +2023-12-09 1 yy 97 4 +2023-12-09 1 yy 97 4 +2023-12-10 1 yy 100 2 +2023-12-10 1 yy 101 2 +2023-12-10 1 yy 98 2 +2023-12-10 1 yy 99 2 +2023-12-11 2 mm 102 3 +2023-12-11 2 mm 103 3 +2023-12-11 2 mm 104 3 +2023-12-12 2 mi 105 2 +2023-12-12 2 mi 105 2 +2023-12-12 2 mi 106 2 +2023-12-12 2 mi 106 2 +2023-12-12 2 mi 107 2 +2023-12-12 2 mi 107 2 +2023-12-12 2 mi 108 2 +2023-12-12 2 mi 108 2 + diff --git a/regression-test/suites/nereids_rules_p0/mv/join/left_outer/outer_join.groovy b/regression-test/suites/nereids_rules_p0/mv/join/left_outer/outer_join.groovy index f31a1a77978cb4..faa2c747a837c5 100644 --- a/regression-test/suites/nereids_rules_p0/mv/join/left_outer/outer_join.groovy +++ b/regression-test/suites/nereids_rules_p0/mv/join/left_outer/outer_join.groovy @@ -759,4 +759,53 @@ suite("outer_join") { async_mv_rewrite_success(db, mv11_0, query11_0, "mv11_0") order_qt_query11_0_after "${query11_0}" sql """ DROP MATERIALIZED VIEW IF EXISTS mv11_0""" + + + def mv12_0 = """ + select + o_orderdate, + o_shippriority, + o_comment, + o.o_code as o_o_code, + l_orderkey, + l_partkey, + l.o_code as l_o_code + from + orders_same_col o left + join lineitem_same_col l on l_orderkey = o_orderkey + left join partsupp on ps_partkey = l_partkey and l_suppkey = ps_suppkey; + """ + + def query12_0 = """ + select + o_orderdate, + o_shippriority, + o_comment, + o.o_code + l_orderkey, + l_partkey + from + orders_same_col o left + join lineitem_same_col l on l_orderkey = o_orderkey + left join partsupp on ps_partkey = l_partkey and l_suppkey = ps_suppkey + where l.o_code <> '91' + union all + select + o_orderdate, + o_shippriority, + o_comment, + o.o_code + l_orderkey, + l_partkey + from + orders_same_col o left + join lineitem_same_col l on l_orderkey = o_orderkey + left join partsupp on ps_partkey = l_partkey and l_suppkey = ps_suppkey + where l.o_code = '92'; + """ + + order_qt_query12_0_before "${query12_0}" + async_mv_rewrite_success(db, mv12_0, query12_0, "mv12_0") + order_qt_query12_0_after "${query12_0}" + sql """ DROP MATERIALIZED VIEW IF EXISTS mv12_0""" }