From bf354096896b42f5c67a42c9d88f7dbcce9f7103 Mon Sep 17 00:00:00 2001 From: EmmyMiao87 <522274284@qq.com> Date: Tue, 15 Jun 2021 16:55:12 +0800 Subject: [PATCH 1/5] [Bug-fix] Fix wrong data distribution judgment The Fragment where OlapScanNode is located has three data distribution possibilities. 1. UNPARTITIONED: The scan range of OlapScanNode contains only one instance(BE) 2. RANDOM: Involving multi-partitioned tables in OlapScanNode. 3. HASH_PARTITIONED: The involving table is in the colocate group. For a multi-partition table, although the data in each individual partition is distributed according to the bucketing column, the same bucketing column between different partitions is not necessarily in the same be. So the data distribution is RANDOM. If Doris wrongly plan RANDOM as HASH_PARTITIONED, it will lead to the wrong colocate agg node. The result of query is incorrect. Fixed #6028 --- .../apache/doris/planner/OlapScanNode.java | 24 ++++++++++++------- .../doris/planner/ColocatePlanTest.java | 22 +++++++++++++++-- 2 files changed, 35 insertions(+), 11 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index 6f814653dd4bec..6b3a0e68331c46 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -795,14 +795,20 @@ private void filterDeletedRows(Analyzer analyzer) throws AnalysisException { } public DataPartition constructInputPartitionByDistributionInfo() { - DistributionInfo distributionInfo = olapTable.getDefaultDistributionInfo(); - Preconditions.checkState(distributionInfo instanceof HashDistributionInfo); - List distributeColumns = ((HashDistributionInfo) distributionInfo).getDistributionColumns(); - List dataDistributeExprs = Lists.newArrayList(); - for (Column column : distributeColumns) { - SlotRef slotRef = new SlotRef(desc.getRef().getName(), column.getName()); - dataDistributeExprs.add(slotRef); - } - return DataPartition.hashPartitioned(dataDistributeExprs); + if (getNumInstances() == 1) { + return DataPartition.UNPARTITIONED; + } else if (Catalog.getCurrentColocateIndex().isColocateTable(olapTable.getId())) { + DistributionInfo distributionInfo = olapTable.getDefaultDistributionInfo(); + Preconditions.checkState(distributionInfo instanceof HashDistributionInfo); + List distributeColumns = ((HashDistributionInfo) distributionInfo).getDistributionColumns(); + List dataDistributeExprs = Lists.newArrayList(); + for (Column column : distributeColumns) { + SlotRef slotRef = new SlotRef(desc.getRef().getName(), column.getName()); + dataDistributeExprs.add(slotRef); + } + return DataPartition.hashPartitioned(dataDistributeExprs); + } else { + return DataPartition.RANDOM; + } } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/ColocatePlanTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/ColocatePlanTest.java index 5c28b224f3d042..84f198215dec16 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/ColocatePlanTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/ColocatePlanTest.java @@ -50,8 +50,13 @@ public static void setUp() throws Exception { // create table test_colocate (k1 int ,k2 int, k3 int, k4 int) // distributed by hash(k1, k2) buckets 10 // properties ("replication_num" = "2"); - String createTblStmtStr = "create table db1.test_colocate(k1 int, k2 int, k3 int, k4 int) " - + "distributed by hash(k1, k2) buckets 10 properties('replication_num' = '2');"; + String createColocateTblStmtStr = "create table db1.test_colocate(k1 int, k2 int, k3 int, k4 int) " + + "distributed by hash(k1, k2) buckets 10 properties('replication_num' = '2'," + + "'colocate_with' = 'group1');"; + CreateTableStmt createColocateTableStmt = (CreateTableStmt) UtFrameUtils.parseAndAnalyzeStmt(createColocateTblStmtStr, ctx); + Catalog.getCurrentCatalog().createTable(createColocateTableStmt); + String createTblStmtStr = "create table db1.test(k1 int, k2 int, k3 int, k4 int)" + + "distributed by hash(k1, k2) buckets 10 properties('replication_num' = '2')"; CreateTableStmt createTableStmt = (CreateTableStmt) UtFrameUtils.parseAndAnalyzeStmt(createTblStmtStr, ctx); Catalog.getCurrentCatalog().createTable(createTableStmt); } @@ -118,4 +123,17 @@ public void sqlAggMoreThanTableMeta() throws Exception { Assert.assertEquals(1, StringUtils.countMatches(plan1, "AGGREGATE")); Assert.assertTrue(plan1.contains(COLOCATE_ENABLE)); } + + // without: + // 1. agg columns = distributed columns + // 2. table is not in colocate group + // 3. more then 1 instances + // Fixed #6028 + @Test + public void sqlAggWithNonColocateTable() throws Exception { + String sql = "explain select k1, k2 from db1.test group by k1, k2"; + String plan1 = UtFrameUtils.getSQLPlanOrErrorMsg(ctx, sql); + Assert.assertEquals(2, StringUtils.countMatches(plan1, "AGGREGATE")); + Assert.assertFalse(plan1.contains(COLOCATE_ENABLE)); + } } From a635259ce3e88efa5dfcf8db90b96aef0a0c60ed Mon Sep 17 00:00:00 2001 From: EmmyMiao87 <522274284@qq.com> Date: Wed, 16 Jun 2021 15:41:55 +0800 Subject: [PATCH 2/5] Remove unpartitioned --- .../src/main/java/org/apache/doris/planner/OlapScanNode.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index 6b3a0e68331c46..af853c9ad19102 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -795,9 +795,7 @@ private void filterDeletedRows(Analyzer analyzer) throws AnalysisException { } public DataPartition constructInputPartitionByDistributionInfo() { - if (getNumInstances() == 1) { - return DataPartition.UNPARTITIONED; - } else if (Catalog.getCurrentColocateIndex().isColocateTable(olapTable.getId())) { + if (Catalog.getCurrentColocateIndex().isColocateTable(olapTable.getId())) { DistributionInfo distributionInfo = olapTable.getDefaultDistributionInfo(); Preconditions.checkState(distributionInfo instanceof HashDistributionInfo); List distributeColumns = ((HashDistributionInfo) distributionInfo).getDistributionColumns(); From a4c2ba8d6514aa799a59cb705e9779d870a958b6 Mon Sep 17 00:00:00 2001 From: EmmyMiao87 <522274284@qq.com> Date: Wed, 16 Jun 2021 15:59:20 +0800 Subject: [PATCH 3/5] remove unpartition --- .../main/java/org/apache/doris/planner/OlapScanNode.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index af853c9ad19102..b96ff3fa8e4c35 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -794,6 +794,12 @@ private void filterDeletedRows(Analyzer analyzer) throws AnalysisException { } } + /* + Although sometimes the scan range only involves one instance, + the data distribution cannot be set to UNPARTITION here. + The reason is that @coordicator will not set the scan range for the fragment, + when data partition of fragment is UNPARTITION. + */ public DataPartition constructInputPartitionByDistributionInfo() { if (Catalog.getCurrentColocateIndex().isColocateTable(olapTable.getId())) { DistributionInfo distributionInfo = olapTable.getDefaultDistributionInfo(); From 994408869c586b8e42b628b91497ee740039fef2 Mon Sep 17 00:00:00 2001 From: EmmyMiao87 <522274284@qq.com> Date: Wed, 16 Jun 2021 19:40:43 +0800 Subject: [PATCH 4/5] Fix ut --- .../src/main/java/org/apache/doris/planner/OlapScanNode.java | 4 +++- .../src/test/java/org/apache/doris/planner/QueryPlanTest.java | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index b96ff3fa8e4c35..143c175095d82e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -801,7 +801,9 @@ private void filterDeletedRows(Analyzer analyzer) throws AnalysisException { when data partition of fragment is UNPARTITION. */ public DataPartition constructInputPartitionByDistributionInfo() { - if (Catalog.getCurrentColocateIndex().isColocateTable(olapTable.getId())) { + if (Catalog.getCurrentColocateIndex().isColocateTable(olapTable.getId()) + || olapTable.getPartitionInfo().getType() == PartitionType.UNPARTITIONED + || olapTable.getPartitions().size() == 1) { DistributionInfo distributionInfo = olapTable.getDefaultDistributionInfo(); Preconditions.checkState(distributionInfo instanceof HashDistributionInfo); List distributeColumns = ((HashDistributionInfo) distributionInfo).getDistributionColumns(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java index a026992f582092..1edd790acab230 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java @@ -1359,7 +1359,7 @@ public void testAggregateSatisfyOlapTableDistribution() throws Exception { sql = "SELECT dt, dis_key, COUNT(1) FROM table_partitioned group by dt, dis_key"; explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, "EXPLAIN " + sql); System.out.println(explainString); - Assert.assertTrue(explainString.contains("AGGREGATE (update finalize)")); + Assert.assertTrue(explainString.contains("AGGREGATE (update serialize)")); } public void testLeadAndLagFunction() throws Exception { From 567dae36c1e5dc32e7f980bf2bd856c7eba9e608 Mon Sep 17 00:00:00 2001 From: EmmyMiao87 <522274284@qq.com> Date: Thu, 17 Jun 2021 10:28:04 +0800 Subject: [PATCH 5/5] ut --- .../src/test/java/org/apache/doris/planner/ColocatePlanTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/ColocatePlanTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/ColocatePlanTest.java index 84f198215dec16..a2fd126c14ac0b 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/ColocatePlanTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/ColocatePlanTest.java @@ -56,6 +56,7 @@ public static void setUp() throws Exception { CreateTableStmt createColocateTableStmt = (CreateTableStmt) UtFrameUtils.parseAndAnalyzeStmt(createColocateTblStmtStr, ctx); Catalog.getCurrentCatalog().createTable(createColocateTableStmt); String createTblStmtStr = "create table db1.test(k1 int, k2 int, k3 int, k4 int)" + + "partition by range(k1) (partition p1 values less than (\"1\"), partition p2 values less than (\"2\"))" + "distributed by hash(k1, k2) buckets 10 properties('replication_num' = '2')"; CreateTableStmt createTableStmt = (CreateTableStmt) UtFrameUtils.parseAndAnalyzeStmt(createTblStmtStr, ctx); Catalog.getCurrentCatalog().createTable(createTableStmt);