Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 27 additions & 9 deletions fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -1585,10 +1585,14 @@ private void computeFragmentExecParams() throws Exception {
dest.server = dummyServer;
dest.setBrpcServer(dummyServer);

int parallelTasksNum = destParams.ignoreDataDistribution
? destParams.parallelTasksNum : destParams.instanceExecParams.size();
for (int insIdx = 0; insIdx < parallelTasksNum; insIdx++) {
Set<TNetworkAddress> hostSet = new HashSet<>();
for (int insIdx = 0; insIdx < destParams.instanceExecParams.size(); insIdx++) {
FInstanceExecParam instanceExecParams = destParams.instanceExecParams.get(insIdx);
if (destParams.ignoreDataDistribution
&& hostSet.contains(instanceExecParams.host)) {
continue;
}
hostSet.add(instanceExecParams.host);
if (instanceExecParams.bucketSeqSet.contains(bucketSeq)) {
dest.fragment_instance_id = instanceExecParams.instanceId;
dest.server = toRpcHost(instanceExecParams.host);
Expand Down Expand Up @@ -1623,10 +1627,14 @@ private void computeFragmentExecParams() throws Exception {
}
});
} else {
Set<TNetworkAddress> hostSet = new HashSet<>();
// add destination host to this fragment's destination
int parallelTasksNum = destParams.ignoreDataDistribution
? destParams.parallelTasksNum : destParams.instanceExecParams.size();
for (int j = 0; j < parallelTasksNum; ++j) {
for (int j = 0; j < destParams.instanceExecParams.size(); ++j) {
if (destParams.ignoreDataDistribution
&& hostSet.contains(destParams.instanceExecParams.get(j).host)) {
continue;
}
hostSet.add(destParams.instanceExecParams.get(j).host);
TPlanFragmentDestination dest = new TPlanFragmentDestination();
dest.fragment_instance_id = destParams.instanceExecParams.get(j).instanceId;
dest.server = toRpcHost(destParams.instanceExecParams.get(j).host);
Expand Down Expand Up @@ -1698,10 +1706,14 @@ private void computeMultiCastFragmentParams() throws Exception {
dest.server = dummyServer;
dest.setBrpcServer(dummyServer);

int parallelTasksNum = destParams.ignoreDataDistribution
? destParams.parallelTasksNum : destParams.instanceExecParams.size();
for (int insIdx = 0; insIdx < parallelTasksNum; insIdx++) {
Set<TNetworkAddress> hostSet = new HashSet<>();
for (int insIdx = 0; insIdx < destParams.instanceExecParams.size(); insIdx++) {
FInstanceExecParam instanceExecParams = destParams.instanceExecParams.get(insIdx);
if (destParams.ignoreDataDistribution
&& hostSet.contains(instanceExecParams.host)) {
continue;
}
hostSet.add(instanceExecParams.host);
if (instanceExecParams.bucketSeqSet.contains(bucketSeq)) {
dest.fragment_instance_id = instanceExecParams.instanceId;
dest.server = toRpcHost(instanceExecParams.host);
Expand Down Expand Up @@ -1736,7 +1748,13 @@ private void computeMultiCastFragmentParams() throws Exception {
}
});
} else {
Set<TNetworkAddress> hostSet = new HashSet<>();
for (int j = 0; j < destParams.instanceExecParams.size(); ++j) {
if (destParams.ignoreDataDistribution
&& hostSet.contains(destParams.instanceExecParams.get(j).host)) {
continue;
}
hostSet.add(destParams.instanceExecParams.get(j).host);
TPlanFragmentDestination dest = new TPlanFragmentDestination();
dest.fragment_instance_id = destParams.instanceExecParams.get(j).instanceId;
dest.server = toRpcHost(destParams.instanceExecParams.get(j).host);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3175,7 +3175,8 @@ public boolean isMaterializedViewRewriteEnableContainForeignTable() {
}

public boolean isIgnoreStorageDataDistribution() {
return ignoreStorageDataDistribution && getEnablePipelineXEngine() && enableLocalShuffle;
return ignoreStorageDataDistribution && getEnablePipelineXEngine() && enableLocalShuffle
&& enableNereidsPlanner;
}

public void setIgnoreStorageDataDistribution(boolean ignoreStorageDataDistribution) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ suite("test_runtime_filter") {
sql "set enable_fallback_to_original_planner=false"
sql "set disable_join_reorder=true"

sql "set ignore_storage_data_distribution=false"
explain{
sql ("""select * from rf_tblA join rf_tblB on a < b""")
contains "runtime filters: RF000[max] -> a"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ suite("test_bitmap_filter_nereids") {
exception "Doris hll, bitmap, array, map, struct, jsonb, variant column must use with specific function, and don't support filter"
}

sql "set ignore_storage_data_distribution=false"
explain{
sql "select k1, k2 from ${tbl1} where k1 in (select k2 from ${tbl2}) order by k1;"
contains "RF000[bitmap]"
Expand Down