diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index 68de384983d29d..748a8885ecc789 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -18,7 +18,11 @@ package org.apache.doris.catalog; import org.apache.doris.alter.MaterializedViewHandler; +import org.apache.doris.analysis.AggregateInfo; import org.apache.doris.analysis.CreateTableStmt; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.SlotDescriptor; +import org.apache.doris.analysis.SlotRef; import org.apache.doris.backup.Status; import org.apache.doris.backup.Status.ErrCode; import org.apache.doris.catalog.DistributionInfo.DistributionInfoType; @@ -1577,4 +1581,40 @@ public TStorageFormat getStorageFormat() { } return tableProperty.getStorageFormat(); } + + // For non partitioned table: + // The table's distribute hash columns need to be a subset of the aggregate columns. + // + // For partitioned table: + // 1. The table's partition columns need to be a subset of the table's hash columns. + // 2. The table's distribute hash columns need to be a subset of the aggregate columns. + public boolean meetAggDistributionRequirements(AggregateInfo aggregateInfo) { + ArrayList groupingExps = aggregateInfo.getGroupingExprs(); + if (groupingExps == null || groupingExps.isEmpty()) { + return false; + } + List partitionExps = aggregateInfo.getPartitionExprs() != null ? + aggregateInfo.getPartitionExprs() : groupingExps; + DistributionInfo distribution = getDefaultDistributionInfo(); + if(distribution instanceof HashDistributionInfo) { + List distributeColumns = + ((HashDistributionInfo)distribution).getDistributionColumns(); + PartitionInfo partitionInfo = getPartitionInfo(); + if (partitionInfo instanceof RangePartitionInfo) { + List rangeColumns = ((RangePartitionInfo)partitionInfo).getPartitionColumns(); + if (!distributeColumns.containsAll(rangeColumns)) { + return false; + } + } + List partitionSlots = + partitionExps.stream().map(Expr::unwrapSlotRef).collect(Collectors.toList()); + if (partitionSlots.contains(null)) { + return false; + } + List hashColumns = partitionSlots.stream() + .map(SlotRef::getDesc).map(SlotDescriptor::getColumn).collect(Collectors.toList()); + return hashColumns.containsAll(distributeColumns); + } + return false; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java index 94c429d81ea888..4eb9d709a70665 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java @@ -825,7 +825,16 @@ private PlanFragment createAggregationFragment( if (isDistinct) { return createPhase2DistinctAggregationFragment(node, childFragment, fragments); } else { - return createMergeAggregationFragment(node, childFragment); + + // Check table's distribution. See #4481. + PlanNode childPlan = childFragment.getPlanRoot(); + if (childPlan instanceof OlapScanNode && + ((OlapScanNode) childPlan).getOlapTable().meetAggDistributionRequirements(node.getAggInfo())) { + childFragment.addPlanRoot(node); + return childFragment; + } else { + return createMergeAggregationFragment(node, childFragment); + } } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java index 006a402f048ab8..98dac0dccc90ca 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java @@ -308,6 +308,30 @@ public static void beforeClass() throws Exception { "\"database\" = \"db1\",\n" + "\"table\" = \"tbl1\"\n" + ");"); + + createTable("CREATE TABLE test.`table_partitioned` (\n" + + " `dt` int(11) NOT NULL COMMENT \"\",\n" + + " `dis_key` varchar(20) NOT NULL COMMENT \"\"\n" + + ") ENGINE=OLAP\n" + + "DUPLICATE KEY(`dt`, `dis_key`)\n" + + "PARTITION BY RANGE(`dt`)\n" + + "(PARTITION p20200101 VALUES [(\"-1\"), (\"20200101\")),\n" + + "PARTITION p20200201 VALUES [(\"20200101\"), (\"20200201\")))\n" + + "DISTRIBUTED BY HASH(`dt`, `dis_key`) BUCKETS 2\n" + + "PROPERTIES (\n" + + "\"replication_num\" = \"1\"\n" + + ");"); + + createTable("CREATE TABLE test.`table_unpartitioned` (\n" + + " `dt` int(11) NOT NULL COMMENT \"\",\n" + + " `dis_key` varchar(20) NOT NULL COMMENT \"\"\n" + + ") ENGINE=OLAP\n" + + "DUPLICATE KEY(`dt`, `dis_key`)\n" + + "COMMENT \"OLAP\"\n" + + "DISTRIBUTED BY HASH(`dt`, `dis_key`) BUCKETS 2\n" + + "PROPERTIES (\n" + + "\"replication_num\" = \"1\"\n" + + ");"); } @AfterClass @@ -1037,6 +1061,21 @@ public void testInformationFunctions() throws Exception { infoFunc.analyze(analyzer); Assert.assertEquals("'root'@'%'", infoFunc.getStrValue()); } + + @Test + public void testAggregateSatisfyOlapTableDistribution() throws Exception { + FeConstants.runningUnitTest = true; + connectContext.setDatabase("default_cluster:test"); + String sql = "SELECT dt, dis_key, COUNT(1) FROM table_unpartitioned group by dt, dis_key"; + String explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, "EXPLAIN " + sql); + System.out.println(explainString); + Assert.assertTrue(explainString.contains("AGGREGATE (update finalize)")); + + sql = "SELECT dt, dis_key, COUNT(1) FROM table_partitioned group by dt, dis_key"; + explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, "EXPLAIN " + sql); + System.out.println(explainString); + Assert.assertTrue(explainString.contains("AGGREGATE (update finalize)")); + } }