diff --git a/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java b/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java index f2e7b0e5a188b9..f46581e3004b7b 100644 --- a/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java +++ b/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java @@ -132,4 +132,6 @@ public final class FeMetaVersion { public static final int VERSION_60 = 60; // for alter job v2 public static final int VERSION_61 = 61; + // add param: doris_shuffle_partitions + public static final int VERSION_62 = 62; } diff --git a/fe/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/src/main/java/org/apache/doris/qe/Coordinator.java index 4b874e53cf0ead..46807207a23a00 100644 --- a/fe/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/src/main/java/org/apache/doris/qe/Coordinator.java @@ -201,9 +201,9 @@ public Coordinator(ConnectContext context, Analyzer analyzer, Planner planner) { this.queryGlobals.setNow_string(DATE_FORMAT.format(new Date())); this.queryGlobals.setTimestamp_ms(new Date().getTime()); if (context.getSessionVariable().getTimeZone().equals("CST")) { - this.queryGlobals.setTime_zone(TimeUtils.DEFAULT_TIME_ZONE); + this.queryGlobals.setTime_zone(TimeUtils.DEFAULT_TIME_ZONE); } else { - this.queryGlobals.setTime_zone(context.getSessionVariable().getTimeZone()); + this.queryGlobals.setTime_zone(context.getSessionVariable().getTimeZone()); } this.tResourceInfo = new TResourceInfo(context.getQualifiedUser(), context.getSessionVariable().getResourceGroup()); @@ -848,18 +848,36 @@ private void computeFragmentHosts() throws Exception { // be BE that fragment's scannode locates, avoid less data. // chenhao added boolean hasUnionNode = containsUnionNode(fragment.getPlanRoot()); + if (!(leftMostNode instanceof ScanNode) && !hasUnionNode) { // there is no leftmost scan; we assign the same hosts as those of our // leftmost input fragment (so that a partitioned aggregation // fragment runs on the hosts that provide the input data) PlanFragmentId inputFragmentIdx = - fragments.get(i).getChild(0).getFragmentId(); + fragments.get(i).getChild(0).getFragmentId(); // AddAll() soft copy() - for (FInstanceExecParam execParams - : fragmentExecParamsMap.get(inputFragmentIdx).instanceExecParams) { - FInstanceExecParam instanceParam = new FInstanceExecParam(null, execParams.host, - 0, params); - params.instanceExecParams.add(instanceParam); + int exchangeInstances = -1; + if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable() != null) { + exchangeInstances = ConnectContext.get().getSessionVariable().getExchangeInstanceParallel(); + } + if (exchangeInstances > 0 && fragmentExecParamsMap.get(inputFragmentIdx).instanceExecParams.size() > exchangeInstances) { + // random select some instance + // get distinct host, when parallel_fragment_exec_instance_num > 1, single host may execute severval instances + Set hostSet = Sets.newHashSet(); + for (FInstanceExecParam execParams: fragmentExecParamsMap.get(inputFragmentIdx).instanceExecParams) { + hostSet.add(execParams.host); + } + List hosts = Lists.newArrayList(hostSet); + Collections.shuffle(hosts, instanceRandom); + for (int index = 0; index < exchangeInstances; index++) { + FInstanceExecParam instanceParam = new FInstanceExecParam(null, hosts.get(index % hosts.size()), 0, params); + params.instanceExecParams.add(instanceParam); + } + } else { + for (FInstanceExecParam execParams: fragmentExecParamsMap.get(inputFragmentIdx).instanceExecParams) { + FInstanceExecParam instanceParam = new FInstanceExecParam(null, execParams.host, 0, params); + params.instanceExecParams.add(instanceParam); + } } // When group by cardinality is smaller than number of backend, only some backends always @@ -1159,7 +1177,7 @@ public void updateFragmentExecStatus(TReportExecStatusParams params) { updateCommitInfos(params.getCommitInfos()); } profileDoneSignal.markedCountDown(params.getFragment_instance_id(), -1L); - } + } if (params.isSetLoaded_rows()) { Catalog.getCurrentCatalog().getLoadManager().updateJobLoadedRows(jobId, params.query_id, params.loaded_rows); @@ -1310,7 +1328,7 @@ protected class FragmentExecParams { public List inputFragments = Lists.newArrayList(); public List instanceExecParams = Lists.newArrayList(); public FragmentScanRangeAssignment scanRangeAssignment = new FragmentScanRangeAssignment(); - + public FragmentExecParams(PlanFragment fragment) { this.fragment = fragment; } diff --git a/fe/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/src/main/java/org/apache/doris/qe/SessionVariable.java index 7463a524c35439..d33e3159882097 100644 --- a/fe/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -73,6 +73,8 @@ public class SessionVariable implements Serializable, Writable { public static final int MAX_EXEC_INSTANCE_NUM = 32; // if set to true, some of stmt will be forwarded to master FE to get result public static final String FORWARD_TO_MASTER = "forward_to_master"; + // user can set instance num after exchange, no need to be equal to nums of before exchange + public static final String PARALLEL_EXCHANGE_INSTANCE_NUM = "parallel_exchange_instance_num"; // max memory used on every backend. @VariableMgr.VarAttr(name = EXEC_MEM_LIMIT) @@ -158,6 +160,9 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = TIME_ZONE) private String timeZone = "CST"; + @VariableMgr.VarAttr(name = PARALLEL_EXCHANGE_INSTANCE_NUM) + private int exchangeInstanceParallel = -1; + // The current time zone @VariableMgr.VarAttr(name = SQL_SAFE_UPDATES) private int sqlSafeUpdates = 0; @@ -428,6 +433,10 @@ public int getParallelExecInstanceNum() { return parallelExecInstanceNum; } + public int getExchangeInstanceParallel() { + return exchangeInstanceParallel; + } + public void setParallelExecInstanceNum(int parallelExecInstanceNum) { if (parallelExecInstanceNum < MIN_EXEC_INSTANCE_NUM) { this.parallelExecInstanceNum = MIN_EXEC_INSTANCE_NUM; @@ -441,7 +450,7 @@ public void setParallelExecInstanceNum(int parallelExecInstanceNum) { public boolean getEnableInsertStrict() { return enableInsertStrict; } public void setEnableInsertStrict(boolean enableInsertStrict) { this.enableInsertStrict = enableInsertStrict; } - + // Serialize to thrift object public boolean getForwardToMaster() { return forwardToMaster; @@ -456,13 +465,13 @@ public void setForwardToMaster(boolean forwardToMaster) { public TQueryOptions toThrift() { TQueryOptions tResult = new TQueryOptions(); tResult.setMem_limit(maxExecMemByte); - + // TODO chenhao, reservation will be calculated by cost tResult.setMin_reservation(0); tResult.setMax_reservation(maxExecMemByte); - tResult.setInitial_reservation_total_claims(maxExecMemByte); + tResult.setInitial_reservation_total_claims(maxExecMemByte); tResult.setBuffer_pool_limit(maxExecMemByte); - + tResult.setQuery_timeout(queryTimeoutS); tResult.setIs_report_success(isReportSucc); tResult.setCodegen_level(codegenLevel); @@ -502,8 +511,9 @@ public void write(DataOutput out) throws IOException { out.writeLong(maxExecMemByte); Text.writeString(out, collationServer); out.writeInt(batchSize); - out.writeBoolean(disableStreamPreaggregations); + out.writeBoolean(disableStreamPreaggregations); out.writeInt(parallelExecInstanceNum); + out.writeInt(exchangeInstanceParallel); } @Override @@ -542,5 +552,9 @@ public void readFields(DataInput in) throws IOException { disableStreamPreaggregations = in.readBoolean(); parallelExecInstanceNum = in.readInt(); } + if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_62) { + exchangeInstanceParallel = in.readInt(); + } + } }