From 123dc155e122404b2a72ea56ee11c391028c0b7f Mon Sep 17 00:00:00 2001 From: liulijia Date: Sat, 29 Aug 2020 00:08:03 +0800 Subject: [PATCH 1/2] Do not add exchange when table's distributioin satisfy the distribution requirements For #4481. In DistributedPlanner, do not add the unnecessary Exchanges. For case 1, we only need to judge that the table's distribute hash keys is a subset of the aggregate keys. For case 2, we should jude two conditions: - partition keys are also hash keys. - the table's distribute hash keys is a subset of the aggregate keys. --- .../org/apache/doris/catalog/OlapTable.java | 30 ++++++++++++++ .../doris/planner/DistributedPlanner.java | 8 ++++ .../apache/doris/planner/QueryPlanTest.java | 39 +++++++++++++++++++ 3 files changed, 77 insertions(+) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index 68de384983d29d..8ac0e82f9deb04 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -19,6 +19,9 @@ import org.apache.doris.alter.MaterializedViewHandler; import org.apache.doris.analysis.CreateTableStmt; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.SlotDescriptor; +import org.apache.doris.analysis.SlotRef; import org.apache.doris.backup.Status; import org.apache.doris.backup.Status.ErrCode; import org.apache.doris.catalog.DistributionInfo.DistributionInfoType; @@ -1577,4 +1580,31 @@ public TStorageFormat getStorageFormat() { } return tableProperty.getStorageFormat(); } + + public boolean satisfyHashDistribution(List hashExprs) { + if (hashExprs == null) { + return false; + } + DistributionInfo distribution = getDefaultDistributionInfo(); + if(distribution instanceof HashDistributionInfo) { + List distributeColumns = + ((HashDistributionInfo)distribution).getDistributionColumns(); + PartitionInfo childPartitionInfo = getPartitionInfo(); + if (childPartitionInfo instanceof RangePartitionInfo) { + List rangeColumns = ((RangePartitionInfo)childPartitionInfo).getPartitionColumns(); + if (!distributeColumns.containsAll(rangeColumns)) { + return false; + } + } + List partitionSlots = + hashExprs.stream().map(Expr::unwrapSlotRef).collect(Collectors.toList()); + if (partitionSlots.contains(null)) { + return false; + } + List hashColumns = partitionSlots.stream() + .map(SlotRef::getDesc).map(SlotDescriptor::getColumn).collect(Collectors.toList()); + return hashColumns.containsAll(distributeColumns); + } + return false; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java index 94c429d81ea888..bb6c8323e9c7ed 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java @@ -863,6 +863,14 @@ private PlanFragment createMergeAggregationFragment(AggregationNode node, PlanFr // childFragment.addPlanRoot(node); // return childFragment; // } + + PlanNode childPlan = childFragment.getPlanRoot(); + if (childPlan instanceof OlapScanNode && + ((OlapScanNode)childPlan).getOlapTable().satisfyHashDistribution(partitionExprs)) { + childFragment.addPlanRoot(node); + return childFragment; + } + // the parent fragment is partitioned on the grouping exprs; // substitute grouping exprs to reference the *output* of the agg, not the input partitionExprs = Expr.substituteList(partitionExprs, diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java index 006a402f048ab8..98dac0dccc90ca 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java @@ -308,6 +308,30 @@ public static void beforeClass() throws Exception { "\"database\" = \"db1\",\n" + "\"table\" = \"tbl1\"\n" + ");"); + + createTable("CREATE TABLE test.`table_partitioned` (\n" + + " `dt` int(11) NOT NULL COMMENT \"\",\n" + + " `dis_key` varchar(20) NOT NULL COMMENT \"\"\n" + + ") ENGINE=OLAP\n" + + "DUPLICATE KEY(`dt`, `dis_key`)\n" + + "PARTITION BY RANGE(`dt`)\n" + + "(PARTITION p20200101 VALUES [(\"-1\"), (\"20200101\")),\n" + + "PARTITION p20200201 VALUES [(\"20200101\"), (\"20200201\")))\n" + + "DISTRIBUTED BY HASH(`dt`, `dis_key`) BUCKETS 2\n" + + "PROPERTIES (\n" + + "\"replication_num\" = \"1\"\n" + + ");"); + + createTable("CREATE TABLE test.`table_unpartitioned` (\n" + + " `dt` int(11) NOT NULL COMMENT \"\",\n" + + " `dis_key` varchar(20) NOT NULL COMMENT \"\"\n" + + ") ENGINE=OLAP\n" + + "DUPLICATE KEY(`dt`, `dis_key`)\n" + + "COMMENT \"OLAP\"\n" + + "DISTRIBUTED BY HASH(`dt`, `dis_key`) BUCKETS 2\n" + + "PROPERTIES (\n" + + "\"replication_num\" = \"1\"\n" + + ");"); } @AfterClass @@ -1037,6 +1061,21 @@ public void testInformationFunctions() throws Exception { infoFunc.analyze(analyzer); Assert.assertEquals("'root'@'%'", infoFunc.getStrValue()); } + + @Test + public void testAggregateSatisfyOlapTableDistribution() throws Exception { + FeConstants.runningUnitTest = true; + connectContext.setDatabase("default_cluster:test"); + String sql = "SELECT dt, dis_key, COUNT(1) FROM table_unpartitioned group by dt, dis_key"; + String explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, "EXPLAIN " + sql); + System.out.println(explainString); + Assert.assertTrue(explainString.contains("AGGREGATE (update finalize)")); + + sql = "SELECT dt, dis_key, COUNT(1) FROM table_partitioned group by dt, dis_key"; + explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, "EXPLAIN " + sql); + System.out.println(explainString); + Assert.assertTrue(explainString.contains("AGGREGATE (update finalize)")); + } } From 8adbd1356cf112894c7021ec850b4adb3525ea9c Mon Sep 17 00:00:00 2001 From: liulijia Date: Sun, 30 Aug 2020 16:23:16 +0800 Subject: [PATCH 2/2] Code style optimize. --- .../org/apache/doris/catalog/OlapTable.java | 22 ++++++++++++++----- .../doris/planner/DistributedPlanner.java | 19 ++++++++-------- 2 files changed, 26 insertions(+), 15 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index 8ac0e82f9deb04..748a8885ecc789 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -18,6 +18,7 @@ package org.apache.doris.catalog; import org.apache.doris.alter.MaterializedViewHandler; +import org.apache.doris.analysis.AggregateInfo; import org.apache.doris.analysis.CreateTableStmt; import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.SlotDescriptor; @@ -1581,23 +1582,32 @@ public TStorageFormat getStorageFormat() { return tableProperty.getStorageFormat(); } - public boolean satisfyHashDistribution(List hashExprs) { - if (hashExprs == null) { + // For non partitioned table: + // The table's distribute hash columns need to be a subset of the aggregate columns. + // + // For partitioned table: + // 1. The table's partition columns need to be a subset of the table's hash columns. + // 2. The table's distribute hash columns need to be a subset of the aggregate columns. + public boolean meetAggDistributionRequirements(AggregateInfo aggregateInfo) { + ArrayList groupingExps = aggregateInfo.getGroupingExprs(); + if (groupingExps == null || groupingExps.isEmpty()) { return false; } + List partitionExps = aggregateInfo.getPartitionExprs() != null ? + aggregateInfo.getPartitionExprs() : groupingExps; DistributionInfo distribution = getDefaultDistributionInfo(); if(distribution instanceof HashDistributionInfo) { List distributeColumns = ((HashDistributionInfo)distribution).getDistributionColumns(); - PartitionInfo childPartitionInfo = getPartitionInfo(); - if (childPartitionInfo instanceof RangePartitionInfo) { - List rangeColumns = ((RangePartitionInfo)childPartitionInfo).getPartitionColumns(); + PartitionInfo partitionInfo = getPartitionInfo(); + if (partitionInfo instanceof RangePartitionInfo) { + List rangeColumns = ((RangePartitionInfo)partitionInfo).getPartitionColumns(); if (!distributeColumns.containsAll(rangeColumns)) { return false; } } List partitionSlots = - hashExprs.stream().map(Expr::unwrapSlotRef).collect(Collectors.toList()); + partitionExps.stream().map(Expr::unwrapSlotRef).collect(Collectors.toList()); if (partitionSlots.contains(null)) { return false; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java index bb6c8323e9c7ed..4eb9d709a70665 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java @@ -825,7 +825,16 @@ private PlanFragment createAggregationFragment( if (isDistinct) { return createPhase2DistinctAggregationFragment(node, childFragment, fragments); } else { - return createMergeAggregationFragment(node, childFragment); + + // Check table's distribution. See #4481. + PlanNode childPlan = childFragment.getPlanRoot(); + if (childPlan instanceof OlapScanNode && + ((OlapScanNode) childPlan).getOlapTable().meetAggDistributionRequirements(node.getAggInfo())) { + childFragment.addPlanRoot(node); + return childFragment; + } else { + return createMergeAggregationFragment(node, childFragment); + } } } @@ -863,14 +872,6 @@ private PlanFragment createMergeAggregationFragment(AggregationNode node, PlanFr // childFragment.addPlanRoot(node); // return childFragment; // } - - PlanNode childPlan = childFragment.getPlanRoot(); - if (childPlan instanceof OlapScanNode && - ((OlapScanNode)childPlan).getOlapTable().satisfyHashDistribution(partitionExprs)) { - childFragment.addPlanRoot(node); - return childFragment; - } - // the parent fragment is partitioned on the grouping exprs; // substitute grouping exprs to reference the *output* of the agg, not the input partitionExprs = Expr.substituteList(partitionExprs,