Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1043,6 +1043,37 @@ private PlanFragment createRepeatNodeFragment(
throws UserException {
repeatNode.setNumInstances(childFragment.getPlanRoot().getNumInstances());
childFragment.addPlanRoot(repeatNode);
/*
The Repeat Node will change the data partition of fragment
when the origin data partition of fragment is HashPartition.
For example,
Query: SELECT k1, k2, sum(v1)
FROM table
GROUP BY GROUPING SETS ((k1, k2), (k1), (k2), ( ))
Table schema: table distributed by k1
The Child Fragment:
Fragment 0
Data partition: k1
Repeat Node: repeat 3 lines [[0, 1], [0], [1], []]
OlapScanNode: table
Data before Repeat Node is partitioned by k1 such as:
| Node 1 | | Node 2 |
| 1, 1 | | 2, 1 |
| 1, 2 | | 2, 2 |
Data after Repeat Node is partitioned by RANDOM such as:
| Node 1 | | Node 2 |
| 1, 1 | | 2, 1 |
| 1, 2 | | 2, 2 |
| null,1 | | null,1 |
| null,2 | | null,2 |
...
The Repeat Node will generate some new rows.
The distribution of these new rows is completely inconsistent with the original data distribution,
their distribution is RANDOM.
Therefore, the data distribution method of the fragment needs to be modified here.
Only the correct data distribution can make the correct result when judging **colocate**.
*/
childFragment.updateDataPartition(DataPartition.RANDOM);
return childFragment;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public class PlanFragment extends TreeNode<PlanFragment> {
// specification of the partition of the input of this fragment;
// an UNPARTITIONED fragment is executed on only a single node
// TODO: improve this comment, "input" is a bit misleading
private final DataPartition dataPartition;
private DataPartition dataPartition;

// specification of the actually input partition of this fragment when transmitting to be.
// By default, the value of the data partition in planner and the data partition transmitted to be are the same.
Expand Down Expand Up @@ -267,6 +267,13 @@ public boolean isPartitioned() {
return (dataPartition.getType() != TPartitionType.UNPARTITIONED);
}

public void updateDataPartition(DataPartition dataPartition) {
if (this.dataPartition == DataPartition.UNPARTITIONED) {
return;
}
this.dataPartition = dataPartition;
}

public PlanFragmentId getId() { return fragmentId; }

public PlanFragment getDestFragment() {
Expand Down