diff --git a/fe/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/src/main/java/org/apache/doris/qe/Coordinator.java index 59b8235511721e..c583ad18b77d5f 100644 --- a/fe/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/src/main/java/org/apache/doris/qe/Coordinator.java @@ -88,6 +88,7 @@ import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.ArrayList; +import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.HashSet; @@ -111,6 +112,9 @@ public class Coordinator { private static String localIP = FrontendOptions.getLocalHostAddress(); + // Random is used to shuffle instances of partitioned + private static Random instanceRandom = new Random(); + // Overall status of the entire query; set to the first reported fragment error // status or to CANCELLED, if Cancel() is called. Status queryStatus = new Status(); @@ -857,7 +861,12 @@ private void computeFragmentHosts() throws Exception { 0, params); params.instanceExecParams.add(instanceParam); } - + + // When group by cardinality is smaller than number of backend, only some backends always + // process while other has no data to process. + // So we shuffle instances to make different backends handle different queries. + Collections.shuffle(params.instanceExecParams, instanceRandom); + // TODO: switch to unpartitioned/coord execution if our input fragment // is executed that way (could have been downgraded from distributed) continue;