diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java index 3a6f08f8cc8a1f..c6c44b9fb4a920 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java @@ -198,7 +198,7 @@ protected List getValidQueryStructInfos(Plan queryPlan, CascadesCont * only one materialization every time. Different query pattern should override the sub logic. */ protected List doRewrite(StructInfo queryStructInfo, CascadesContext cascadesContext, - MaterializationContext materializationContext) { + MaterializationContext materializationContext) throws AnalysisException { List rewriteResults = new ArrayList<>(); StructInfo viewStructInfo = materializationContext.getStructInfo(); MatchMode matchMode = decideMatchMode(queryStructInfo.getRelations(), viewStructInfo.getRelations()); @@ -295,7 +295,7 @@ protected List doRewrite(StructInfo queryStructInfo, CascadesContext casca continue; } Pair>, Map>> invalidPartitions; - if (PartitionCompensator.needUnionRewrite(materializationContext) + if (PartitionCompensator.needUnionRewrite(materializationContext, cascadesContext.getStatementContext()) && sessionVariable.isEnableMaterializedViewUnionRewrite()) { MTMV mtmv = ((AsyncMaterializationContext) materializationContext).getMtmv(); BaseTableInfo relatedTableInfo = mtmv.getMvPartitionInfo().getRelatedTableInfo(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensator.java index 3c086de0919cff..f17d5364ff0a45 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensator.java @@ -23,8 +23,10 @@ import org.apache.doris.catalog.PartitionType; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Pair; +import org.apache.doris.datasource.ExternalTable; import org.apache.doris.mtmv.BaseTableInfo; import org.apache.doris.mtmv.MTMVPartitionInfo; +import org.apache.doris.mtmv.MTMVRelatedTableIf; import org.apache.doris.nereids.CascadesContext; import org.apache.doris.nereids.StatementContext; import org.apache.doris.nereids.trees.plans.Plan; @@ -151,26 +153,49 @@ public static boolean needUnionRewrite( /** * Check if need union compensate or not + * If query base table all partitions with ALL_PARTITIONS or ALL_PARTITIONS_LIST, should not do union compensate + * because it means query all partitions from base table and prune partition failed */ - public static boolean needUnionRewrite(MaterializationContext materializationContext) { + public static boolean needUnionRewrite(MaterializationContext materializationContext, + StatementContext statementContext) throws AnalysisException { if (!(materializationContext instanceof AsyncMaterializationContext)) { return false; } MTMV mtmv = ((AsyncMaterializationContext) materializationContext).getMtmv(); - PartitionType type = mtmv.getPartitionInfo().getType(); BaseTableInfo relatedTableInfo = mtmv.getMvPartitionInfo().getRelatedTableInfo(); - return !PartitionType.UNPARTITIONED.equals(type) && relatedTableInfo != null; + if (relatedTableInfo == null) { + return false; + } + if (PartitionType.UNPARTITIONED.equals(mtmv.getPartitionInfo().getType())) { + return false; + } + MTMVRelatedTableIf pctTable = mtmv.getMvPartitionInfo().getRelatedTable(); + Multimap, Pair>> tableUsedPartitionNameMap = + statementContext.getTableUsedPartitionNameMap(); + if (pctTable instanceof ExternalTable && !((ExternalTable) pctTable).supportInternalPartitionPruned()) { + // if pct table is external table and not support internal partition pruned, + // we consider query all partitions from pct table, this would cause loop union compensate, + // so we skip union compensate in this case + return false; + } + Collection>> tableUsedPartitions + = tableUsedPartitionNameMap.get(pctTable.getFullQualifiers()); + // If query base table all partitions with ALL_PARTITIONS or ALL_PARTITIONS_LIST, + // should not do union compensate, because it means query all partitions from base table + // and prune partition failed + return !ALL_PARTITIONS_LIST.equals(tableUsedPartitions) + && tableUsedPartitions.stream().noneMatch(ALL_PARTITIONS::equals); } /** * Get query used partitions * this is calculated from tableUsedPartitionNameMap and tables in statementContext * - * @param customRelationIdSet if union compensate occurs, the new query used partitions is changed, + * @param currentUsedRelationIdSet if union compensate occurs, the new query used partitions is changed, * so need to get used partitions by relation id set */ public static Map, Set> getQueryUsedPartitions(StatementContext statementContext, - BitSet customRelationIdSet) { + BitSet currentUsedRelationIdSet) { // get table used partitions // if table is not in statementContext().getTables() which means the table is partition prune as empty relation Multimap, Pair>> tableUsedPartitionNameMap = statementContext @@ -195,6 +220,13 @@ public static Map, Set> getQueryUsedPartitions(StatementCon queryUsedRelatedTablePartitionsMap.put(queryUsedTable, null); continue tableLoop; } + // If currentUsedRelationIdSet is not empty, need check relation id to get concrete used partitions + BitSet usedPartitionRelation = new BitSet(); + usedPartitionRelation.set(tableUsedPartitionPair.key().asInt()); + if (!currentUsedRelationIdSet.isEmpty() + && !currentUsedRelationIdSet.intersects(usedPartitionRelation)) { + continue; + } usedPartitionSet.addAll(tableUsedPartitionPair.value()); } queryUsedRelatedTablePartitionsMap.put(queryUsedTable, usedPartitionSet); diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensatorTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensatorTest.java index 25c0a679d8d388..5efdff2b12ed70 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensatorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensatorTest.java @@ -17,16 +17,30 @@ package org.apache.doris.nereids.rules.exploration.mv; +import org.apache.doris.catalog.DatabaseIf; +import org.apache.doris.catalog.MTMV; +import org.apache.doris.catalog.PartitionInfo; +import org.apache.doris.catalog.PartitionType; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Pair; +import org.apache.doris.datasource.CatalogIf; +import org.apache.doris.datasource.hive.HMSExternalTable; +import org.apache.doris.mtmv.BaseTableInfo; +import org.apache.doris.mtmv.MTMVPartitionInfo; +import org.apache.doris.mtmv.MTMVRelatedTableIf; +import org.apache.doris.nereids.StatementContext; import org.apache.doris.nereids.trees.plans.RelationId; import org.apache.doris.nereids.util.PlanChecker; import org.apache.doris.utframe.TestWithFeService; +import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Multimap; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import org.mockito.Mockito; import java.util.BitSet; import java.util.List; @@ -190,4 +204,149 @@ public void testGetAllTableUsedPartitionList() { Assertions.assertEquals(orderTableUsedPartition, ImmutableSet.of("p1", "p2", "p3", "p4")); }); } + + @Test + public void testNeedUnionRewriteExternalNoPrune() throws Exception { + MaterializationContext ctx = mockCtx( + PartitionType.LIST, + newBaseTableInfo(), + Mockito.mock(MTMVRelatedTableIf.class), + true); + StatementContext sc = Mockito.mock(StatementContext.class); + Mockito.when(sc.getTableUsedPartitionNameMap()).thenReturn(ArrayListMultimap.create()); + Assertions.assertFalse(PartitionCompensator.needUnionRewrite(ctx, sc)); + } + + @Test + public void testNeedUnionRewritePositive() throws Exception { + MaterializationContext ctx = mockCtx( + PartitionType.LIST, + newBaseTableInfo(), + Mockito.mock(MTMVRelatedTableIf.class), + false); + StatementContext sc = Mockito.mock(StatementContext.class); + Mockito.when(sc.getTableUsedPartitionNameMap()).thenReturn(ArrayListMultimap.create()); + Assertions.assertTrue(PartitionCompensator.needUnionRewrite(ctx, sc)); + } + + @Test + public void testNotNeedUnionRewriteWhenAllPartitions() throws Exception { + BaseTableInfo tableInfo = newBaseTableInfo(); + MaterializationContext ctx = mockCtx( + PartitionType.LIST, + tableInfo, + Mockito.mock(MTMVRelatedTableIf.class), + false); + StatementContext sc = Mockito.mock(StatementContext.class); + + ArrayListMultimap, Pair>> t = ArrayListMultimap.create(); + t.put(ImmutableList.of(), PartitionCompensator.ALL_PARTITIONS); + Mockito.when(sc.getTableUsedPartitionNameMap()).thenReturn(t); + Assertions.assertFalse(PartitionCompensator.needUnionRewrite(ctx, sc)); + } + + @Test + public void testGetQueryUsedPartitionsAllAndPartial() { + // Prepare qualifiers + List lineitemQualifier = ImmutableList.of( + "internal", "partition_compensate_test", "lineitem_list_partition"); + List ordersQualifier = ImmutableList.of( + "internal", "partition_compensate_test", "orders_list_partition"); + + Multimap, Pair>> tableUsedPartitionNameMap + = connectContext.getStatementContext().getTableUsedPartitionNameMap(); + tableUsedPartitionNameMap.clear(); + + tableUsedPartitionNameMap.put(lineitemQualifier, PartitionCompensator.ALL_PARTITIONS); + + RelationId ridA = new RelationId(1); + RelationId ridB = new RelationId(2); + tableUsedPartitionNameMap.put(ordersQualifier, Pair.of(ridA, ImmutableSet.of("p1", "p2"))); + tableUsedPartitionNameMap.put(ordersQualifier, Pair.of(ridB, ImmutableSet.of("p3"))); + + Map, Set> result = PartitionCompensator.getQueryUsedPartitions( + connectContext.getStatementContext(), new BitSet()); + Assertions.assertNull(result.get(lineitemQualifier)); // all partitions + Assertions.assertEquals(ImmutableSet.of("p1", "p2", "p3"), result.get(ordersQualifier)); + + BitSet filterRidA = new BitSet(); + filterRidA.set(ridA.asInt()); + Map, Set> resultRidA = PartitionCompensator.getQueryUsedPartitions( + connectContext.getStatementContext(), filterRidA); + Assertions.assertNull(resultRidA.get(lineitemQualifier)); + Assertions.assertEquals(ImmutableSet.of("p1", "p2"), resultRidA.get(ordersQualifier)); + + BitSet filterRidB = new BitSet(); + filterRidB.set(ridB.asInt()); + Map, Set> resultRidB = PartitionCompensator.getQueryUsedPartitions( + connectContext.getStatementContext(), filterRidB); + Assertions.assertNull(resultRidB.get(lineitemQualifier)); + Assertions.assertEquals(ImmutableSet.of("p3"), resultRidB.get(ordersQualifier)); + + tableUsedPartitionNameMap.put(ordersQualifier, PartitionCompensator.ALL_PARTITIONS); + Map, Set> resultAllOrders = PartitionCompensator.getQueryUsedPartitions( + connectContext.getStatementContext(), new BitSet()); + Assertions.assertNull(resultAllOrders.get(ordersQualifier)); + } + + @Test + public void testGetQueryUsedPartitionsEmptyCollectionMeansNoPartitions() { + List qualifier = ImmutableList.of( + "internal", "partition_compensate_test", "lineitem_list_partition"); + Multimap, Pair>> tableUsedPartitionNameMap + = connectContext.getStatementContext().getTableUsedPartitionNameMap(); + tableUsedPartitionNameMap.clear(); + // Put an empty set via a distinct relation id to simulate no partitions used + RelationId rid = new RelationId(3); + tableUsedPartitionNameMap.put(qualifier, Pair.of(rid, ImmutableSet.of())); + + Map, Set> result = PartitionCompensator.getQueryUsedPartitions( + connectContext.getStatementContext(), new BitSet()); + Assertions.assertEquals(ImmutableSet.of(), result.get(qualifier)); + } + + private static MaterializationContext mockCtx( + PartitionType type, + BaseTableInfo pctInfo, + MTMVRelatedTableIf pctTable, + boolean externalNoPrune) throws AnalysisException { + + MTMV mtmv = Mockito.mock(MTMV.class); + PartitionInfo pi = Mockito.mock(PartitionInfo.class); + Mockito.when(mtmv.getPartitionInfo()).thenReturn(pi); + Mockito.when(pi.getType()).thenReturn(type); + + MTMVPartitionInfo mpi = Mockito.mock(MTMVPartitionInfo.class); + Mockito.when(mtmv.getMvPartitionInfo()).thenReturn(mpi); + Mockito.when(mpi.getRelatedTableInfo()).thenReturn(pctInfo); + Mockito.when(mpi.getRelatedTable()).thenReturn(pctTable); + + if (externalNoPrune) { + HMSExternalTable ext = Mockito.mock(HMSExternalTable.class); + Mockito.when(ext.supportInternalPartitionPruned()).thenReturn(false); + Mockito.when(mpi.getRelatedTable()).thenReturn(ext); + } + + AsyncMaterializationContext ctx = Mockito.mock(AsyncMaterializationContext.class); + Mockito.when(ctx.getMtmv()).thenReturn(mtmv); + return ctx; + } + + private static BaseTableInfo newBaseTableInfo() { + CatalogIf catalog = Mockito.mock(CatalogIf.class); + Mockito.when(catalog.getId()).thenReturn(1L); + Mockito.when(catalog.getName()).thenReturn("internal"); + + DatabaseIf db = Mockito.mock(DatabaseIf.class); + Mockito.when(db.getId()).thenReturn(2L); + Mockito.when(db.getFullName()).thenReturn("partition_compensate_test"); + Mockito.when(db.getCatalog()).thenReturn(catalog); + + TableIf table = Mockito.mock(TableIf.class); + Mockito.when(table.getId()).thenReturn(3L); + Mockito.when(table.getName()).thenReturn("t"); + Mockito.when(table.getDatabase()).thenReturn(db); + + return new BaseTableInfo(table); + } } diff --git a/regression-test/suites/external_table_p2/hudi/hudi_mtmv/test_hudi_rewrite_mtmv.groovy b/regression-test/suites/external_table_p2/hudi/hudi_mtmv/test_hudi_rewrite_mtmv.groovy index 70a11d3633acf4..680f7eaa93d48c 100644 --- a/regression-test/suites/external_table_p2/hudi/hudi_mtmv/test_hudi_rewrite_mtmv.groovy +++ b/regression-test/suites/external_table_p2/hudi/hudi_mtmv/test_hudi_rewrite_mtmv.groovy @@ -62,9 +62,7 @@ suite("test_hudi_rewrite_mtmv", "p2,external,hudi,external_remote,external_remot waitingMTMVTaskFinishedByMvName(mvName) order_qt_refresh_one_partition "SELECT * FROM ${mvName} " - def explainOnePartition = sql """ explain ${mvSql} """ - logger.info("explainOnePartition: " + explainOnePartition.toString()) - assertTrue(explainOnePartition.toString().contains("VUNION")) + mv_rewrite_success(mvSql, mvName) order_qt_refresh_one_partition_rewrite "${mvSql}" mv_rewrite_success("${mvSql}", "${mvName}") @@ -79,9 +77,7 @@ suite("test_hudi_rewrite_mtmv", "p2,external,hudi,external_remote,external_remot waitingMTMVTaskFinishedByMvName(mvName) order_qt_refresh_auto "SELECT * FROM ${mvName} " - def explainAllPartition = sql """ explain ${mvSql}; """ - logger.info("explainAllPartition: " + explainAllPartition.toString()) - assertTrue(explainAllPartition.toString().contains("VOlapScanNode")) + mv_rewrite_success(mvSql, mvName) order_qt_refresh_all_partition_rewrite "${mvSql}" mv_rewrite_success("${mvSql}", "${mvName}")