Skip to content

Colocate plan #5589

@EmmyMiao87

Description

@EmmyMiao87

Is your feature request related to a problem? Please describe.

Currently Doris supports colocate table building and colocate join.
But it did not make full use of the advantages of colocate.
For example, when there is an Aggregation Node between Hash Join Node and Scan Node, colocate join cannot be performed.
For another example, Aggregation Node, Sort Node, and Set Operation nodes can also be absorbed by child nodes when the data distribution matches, but they are not currently available.

Describe the solution you'd like

In the case of reasonable data distribution, the upper-level operators can be absorbed by the lower-level operators,
thereby reducing unnecessary network transmission and serialization and deserialization operations.

Describe alternatives you've considered

The main operators for colocate optimization include the following 4 plan node:
Hash Join Node
Set Operation Node
Aggregation Node
Sort Node (in window function)

Step1: create table

CREATE TABLE `test_colocate` (
  `k1` int(11) NULL COMMENT "",
  `k2` int(11) NULL COMMENT "",
  `k3` int(11) NULL COMMENT "",
  `k4` int(11) NULL COMMENT ""
) ENGINE=OLAP
DUPLICATE KEY(`k1`, `k2`, `k3`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 10
PROPERTIES (
"replication_num" = "2",
"in_memory" = "false",
"storage_format" = "V2"
);

Step2: colocate join

explain select * from (select k1, k2 from test_colocate group by k1, k2) a , test_colocate b where a.k1=b.k1 and a.k2=b.k2;
+-------------------------------------------------------------------------------------------------------------------------+
| Explain String                                                                                                          |
+-------------------------------------------------------------------------------------------------------------------------+
| PLAN FRAGMENT 0                                                                                                         |
|  OUTPUT EXPRS:<slot 2> `k1` | <slot 3> `k2` | `b`.`k1` | `b`.`k2` | `b`.`k3` | `b`.`k4`                                 |
|   PARTITION: UNPARTITIONED                                                                                              |
|                                                                                                                         |
|   RESULT SINK                                                                                                           |
|                                                                                                                         |
|   4:EXCHANGE                                                                                                            |
|                                                                                                                         |
| PLAN FRAGMENT 1                                                                                                         |
|  OUTPUT EXPRS:                                                                                                          |
|   PARTITION: HASH_PARTITIONED: `default_cluster:test`.`test_colocate`.`k1`, `default_cluster:test`.`test_colocate`.`k2` |
|                                                                                                                         |
|   STREAM DATA SINK                                                                                                      |
|     EXCHANGE ID: 04                                                                                                     |
|     UNPARTITIONED                                                                                                       |
|                                                                                                                         |
|   3:HASH JOIN                                                                                                           |
|   |  join op: INNER JOIN                                                                                                |
|   |  hash predicates:                                                                                                   |
|   |  colocate: true                                                                                                     |
|   |  equal join conjunct: <slot 2> `k1` = `b`.`k1`                                                                      |
|   |  equal join conjunct: <slot 3> `k2` = `b`.`k2`                                                                      |
|   |                                                                                                                     |
|   |----2:OlapScanNode                                                                                                   |
|   |       TABLE: test_colocate                                                                                          |
|   |                                                                                                                     |
|   1:AGGREGATE (update finalize)                                                                                         |
|   |  group by: `k1`, `k2`                                                                                               |
|   |                                                                                                                     |
|   0:OlapScanNode                                                                                                        |
|      TABLE: test_colocate                                                                                               |
+-------------------------------------------------------------------------------------------------------------------------+
47 rows in set (0.020 sec)

Step3: colocate aggregation node
Condition: The input partition of aggregation node >= the data partition of child fragment

 explain select k1, k2  from test_colocate where k1=1  group by k1, k2;
+-------------------------------------------------------------------------------------------------------------------------+
| Explain String                                                                                                          |
+-------------------------------------------------------------------------------------------------------------------------+
| PLAN FRAGMENT 0                                                                                                         |
|  OUTPUT EXPRS:<slot 2> `k1` | <slot 3> `k2`                                                                             |
|   PARTITION: UNPARTITIONED                                                                                              |
|                                                                                                                         |
|   RESULT SINK                                                                                                           |
|                                                                                                                         |
|   2:EXCHANGE                                                                                                            |
|                                                                                                                         |
| PLAN FRAGMENT 1                                                                                                         |
|  OUTPUT EXPRS:                                                                                                          |
|   PARTITION: HASH_PARTITIONED: `default_cluster:test`.`test_colocate`.`k1`, `default_cluster:test`.`test_colocate`.`k2` |
|                                                                                                                         |
|   STREAM DATA SINK                                                                                                      |
|     EXCHANGE ID: 02                                                                                                     |
|     UNPARTITIONED                                                                                                       |
|                                                                                                                         |
|   1:AGGREGATE (update finalize)                                                                                         |
|   |  group by: `k1`, `k2`                                                                                               |
|   |                                                                                                                     |
|   0:OlapScanNode                                                                                                        |
|      TABLE: test_colocate                                                                                               |
+-------------------------------------------------------------------------------------------------------------------------+
30 rows in set (0.011 sec)

Step4: sort node (in window function)
Condition:
The sort columns of sort node >= the data partition of child fragment

The sort column is k1 and k2.

explain select k1, sum(k2) over(partition by k1 order by k2) from test_colocate;
+-------------------------------------------------------------------------------------------------------------------------+
| Explain String                                                                                                          |
+-------------------------------------------------------------------------------------------------------------------------+
| PLAN FRAGMENT 0                                                                                                         |
|  OUTPUT EXPRS:<slot 4> <slot 0> | <slot 3>                                                                              |
|   PARTITION: UNPARTITIONED                                                                                              |
|                                                                                                                         |
|   RESULT SINK                                                                                                           |
|                                                                                                                         |
|   3:EXCHANGE                                                                                                            |
|                                                                                                                         |
| PLAN FRAGMENT 1                                                                                                         |
|  OUTPUT EXPRS:                                                                                                          |
|   PARTITION: HASH_PARTITIONED: `default_cluster:test`.`test_colocate`.`k1`, `default_cluster:test`.`test_colocate`.`k2` |
|                                                                                                                         |
|   STREAM DATA SINK                                                                                                      |
|     EXCHANGE ID: 03                                                                                                     |
|     UNPARTITIONED                                                                                                       |
|                                                                                                                         |
|   2:ANALYTIC                                                                                                            |
|   |  functions: [, sum(<slot 5> ), ]                                                                                    |
|   |  partition by: `k1`                                                                                                 |
|   |  order by: <slot 5>  ASC                                                                                            |
|   |  window: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW                                                          |
|   |                                                                                                                     |
|   1:SORT                                                                                                                |
|   |  order by: <slot 4> <slot 0> ASC, <slot 5>  ASC                                                                     |
|   |  offset: 0                                                                                                          |
|   |                                                                                                                     |
|   0:OlapScanNode                                                                                                        |
|      TABLE: test_colocate                                                                                               |
+-------------------------------------------------------------------------------------------------------------------------+
36 rows in set (0.028 sec)

Colocate set operation node
Condition1: The data partition of all child fragments >= input partition of set operation node
Condition2: If there is no exchange node in child fragment, it will mean that the data hasn't been rehashed.
Condition3: Scan node are all in the same colocate group

step1: create table

CREATE TABLE `t1` (
    `id` int(11) COMMENT "",
    `value` varchar(8) COMMENT ""
    ) ENGINE=OLAP
    DUPLICATE KEY(`id`)
    DISTRIBUTED BY HASH(`id`) BUCKETS 10
    PROPERTIES (
    "colocate_with" = "t1",
    "replication_num" = "2"
    );

    CREATE TABLE `t2` (
    `id` int(11) COMMENT "",
    `value` varchar(8) COMMENT ""
    ) ENGINE=OLAP
    DUPLICATE KEY(`id`)
    DISTRIBUTED BY HASH(`id`) BUCKETS 10
    PROPERTIES (
    "colocate_with" = "t1",
    "replication_num" = "2"
    );

    CREATE TABLE `t3` (
    `id` int(11) COMMENT "",
    `value` varchar(8) COMMENT ""
    ) ENGINE=OLAP
    DUPLICATE KEY(`id`)
    DISTRIBUTED BY HASH(`id`) BUCKETS 10
    PROPERTIES (
    "colocate_with" = "t1",
    "replication_num" = "2"
    );

step2: colocate query

The t1, t2, t3 are in the same colocate group and the intersect column id is same as the distributed column id.

explain select id from t1 intersect select id from t2 intersect select id from t3;
+----------------------------------------------------------------------------------+
| Explain String                                                                   |
+----------------------------------------------------------------------------------+
| PLAN FRAGMENT 0                                                                  |
|  OUTPUT EXPRS:<slot 3> `id` `id` `id`                                            |
|   PARTITION: UNPARTITIONED                                                       |
|                                                                                  |
|   RESULT SINK                                                                    |
|                                                                                  |
|   4:EXCHANGE                                                                     |
|                                                                                  |
| PLAN FRAGMENT 1                                                                  |
|  OUTPUT EXPRS:                                                                   |
|   PARTITION: HASH_PARTITIONED: `default_cluster:test`.`t1`.`id`                  |
|                                                                                  |
|   STREAM DATA SINK                                                               |
|     EXCHANGE ID: 04                                                              |
|     UNPARTITIONED                                                                |
|                                                                                  |
|   0:INTERSECT                                                                    |
|   |  colocate=true                                                               |
|   |                                                                              |
|   |----2:OlapScanNode                                                            |
|   |       TABLE: t2                                                              |
|   |                                                                              |
|   |----3:OlapScanNode                                                            |
|   |       TABLE: t3                                                              |
|   |                                                                              |
|   1:OlapScanNode                                                                 |
|      TABLE: t1                                                                   |
+----------------------------------------------------------------------------------+

Additional context

The colocate plan will also have some bad cases.
For example, when the data is skewed, because colocate eliminates the problem of rehash, the query will be more affected by the data skew.
In this case, it is best to give priority to solving the problem of data skew and set a reasonable fragmentation column.
Of course, you can also turn off colocate through session configuration.

Metadata

Metadata

Assignees

No one assigned

    Labels

    area/colocatedIssues or PRs related to colocated tablesarea/plannerIssues or PRs related to the query plannerkind/featureCategorizes issue or PR as related to a new feature.

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions