diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index 6f814653dd4bec..143c175095d82e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -794,15 +794,27 @@ private void filterDeletedRows(Analyzer analyzer) throws AnalysisException { } } + /* + Although sometimes the scan range only involves one instance, + the data distribution cannot be set to UNPARTITION here. + The reason is that @coordicator will not set the scan range for the fragment, + when data partition of fragment is UNPARTITION. + */ public DataPartition constructInputPartitionByDistributionInfo() { - DistributionInfo distributionInfo = olapTable.getDefaultDistributionInfo(); - Preconditions.checkState(distributionInfo instanceof HashDistributionInfo); - List distributeColumns = ((HashDistributionInfo) distributionInfo).getDistributionColumns(); - List dataDistributeExprs = Lists.newArrayList(); - for (Column column : distributeColumns) { - SlotRef slotRef = new SlotRef(desc.getRef().getName(), column.getName()); - dataDistributeExprs.add(slotRef); - } - return DataPartition.hashPartitioned(dataDistributeExprs); + if (Catalog.getCurrentColocateIndex().isColocateTable(olapTable.getId()) + || olapTable.getPartitionInfo().getType() == PartitionType.UNPARTITIONED + || olapTable.getPartitions().size() == 1) { + DistributionInfo distributionInfo = olapTable.getDefaultDistributionInfo(); + Preconditions.checkState(distributionInfo instanceof HashDistributionInfo); + List distributeColumns = ((HashDistributionInfo) distributionInfo).getDistributionColumns(); + List dataDistributeExprs = Lists.newArrayList(); + for (Column column : distributeColumns) { + SlotRef slotRef = new SlotRef(desc.getRef().getName(), column.getName()); + dataDistributeExprs.add(slotRef); + } + return DataPartition.hashPartitioned(dataDistributeExprs); + } else { + return DataPartition.RANDOM; + } } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/ColocatePlanTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/ColocatePlanTest.java index 5c28b224f3d042..a2fd126c14ac0b 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/ColocatePlanTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/ColocatePlanTest.java @@ -50,8 +50,14 @@ public static void setUp() throws Exception { // create table test_colocate (k1 int ,k2 int, k3 int, k4 int) // distributed by hash(k1, k2) buckets 10 // properties ("replication_num" = "2"); - String createTblStmtStr = "create table db1.test_colocate(k1 int, k2 int, k3 int, k4 int) " - + "distributed by hash(k1, k2) buckets 10 properties('replication_num' = '2');"; + String createColocateTblStmtStr = "create table db1.test_colocate(k1 int, k2 int, k3 int, k4 int) " + + "distributed by hash(k1, k2) buckets 10 properties('replication_num' = '2'," + + "'colocate_with' = 'group1');"; + CreateTableStmt createColocateTableStmt = (CreateTableStmt) UtFrameUtils.parseAndAnalyzeStmt(createColocateTblStmtStr, ctx); + Catalog.getCurrentCatalog().createTable(createColocateTableStmt); + String createTblStmtStr = "create table db1.test(k1 int, k2 int, k3 int, k4 int)" + + "partition by range(k1) (partition p1 values less than (\"1\"), partition p2 values less than (\"2\"))" + + "distributed by hash(k1, k2) buckets 10 properties('replication_num' = '2')"; CreateTableStmt createTableStmt = (CreateTableStmt) UtFrameUtils.parseAndAnalyzeStmt(createTblStmtStr, ctx); Catalog.getCurrentCatalog().createTable(createTableStmt); } @@ -118,4 +124,17 @@ public void sqlAggMoreThanTableMeta() throws Exception { Assert.assertEquals(1, StringUtils.countMatches(plan1, "AGGREGATE")); Assert.assertTrue(plan1.contains(COLOCATE_ENABLE)); } + + // without: + // 1. agg columns = distributed columns + // 2. table is not in colocate group + // 3. more then 1 instances + // Fixed #6028 + @Test + public void sqlAggWithNonColocateTable() throws Exception { + String sql = "explain select k1, k2 from db1.test group by k1, k2"; + String plan1 = UtFrameUtils.getSQLPlanOrErrorMsg(ctx, sql); + Assert.assertEquals(2, StringUtils.countMatches(plan1, "AGGREGATE")); + Assert.assertFalse(plan1.contains(COLOCATE_ENABLE)); + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java index a026992f582092..1edd790acab230 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java @@ -1359,7 +1359,7 @@ public void testAggregateSatisfyOlapTableDistribution() throws Exception { sql = "SELECT dt, dis_key, COUNT(1) FROM table_partitioned group by dt, dis_key"; explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, "EXPLAIN " + sql); System.out.println(explainString); - Assert.assertTrue(explainString.contains("AGGREGATE (update finalize)")); + Assert.assertTrue(explainString.contains("AGGREGATE (update serialize)")); } public void testLeadAndLagFunction() throws Exception {