From 3733db6f34950a13b27d675018fb99caf3471f9e Mon Sep 17 00:00:00 2001 From: xionglei1 Date: Wed, 11 Sep 2019 22:57:42 +0800 Subject: [PATCH 01/22] add param: doris_exchange_instances to set parallel after exchange --- .../apache/doris/common/FeMetaVersion.java | 2 + .../java/org/apache/doris/qe/Coordinator.java | 43 ++++++++++++++----- .../org/apache/doris/qe/SessionVariable.java | 24 ++++++++--- 3 files changed, 53 insertions(+), 16 deletions(-) diff --git a/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java b/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java index 9e7c42ce2c5357..4c3fec434bb9dc 100644 --- a/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java +++ b/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java @@ -130,4 +130,6 @@ public final class FeMetaVersion { public static final int VERSION_59 = 59; // refactor date literal public static final int VERSION_60 = 60; + // add param: doris_shuffle_partitions + public static final int Version_61 = 61; } diff --git a/fe/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/src/main/java/org/apache/doris/qe/Coordinator.java index c583ad18b77d5f..bac7008693478c 100644 --- a/fe/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/src/main/java/org/apache/doris/qe/Coordinator.java @@ -201,9 +201,9 @@ public Coordinator(ConnectContext context, Analyzer analyzer, Planner planner) { this.queryGlobals.setNow_string(DATE_FORMAT.format(new Date())); this.queryGlobals.setTimestamp_ms(new Date().getTime()); if (context.getSessionVariable().getTimeZone().equals("CST")) { - this.queryGlobals.setTime_zone(TimeUtils.DEFAULT_TIME_ZONE); + this.queryGlobals.setTime_zone(TimeUtils.DEFAULT_TIME_ZONE); } else { - this.queryGlobals.setTime_zone(context.getSessionVariable().getTimeZone()); + this.queryGlobals.setTime_zone(context.getSessionVariable().getTimeZone()); } this.tResourceInfo = new TResourceInfo(context.getQualifiedUser(), context.getSessionVariable().getResourceGroup()); @@ -226,7 +226,7 @@ public Coordinator(Long jobId, TUniqueId queryId, DescriptorTable descTable, this.queryOptions = new TQueryOptions(); this.queryGlobals.setNow_string(DATE_FORMAT.format(new Date())); this.queryGlobals.setTimestamp_ms(new Date().getTime()); - this.queryGlobals.setTime_zone(TimeUtils.DEFAULT_TIME_ZONE); + this.queryGlobals.setTime_zone(TimeUtils.DEFAULT_TIME_ZONE); this.tResourceInfo = new TResourceInfo("", ""); this.needReport = true; this.clusterName = cluster; @@ -848,18 +848,39 @@ private void computeFragmentHosts() throws Exception { // be BE that fragment's scannode locates, avoid less data. // chenhao added boolean hasUnionNode = containsUnionNode(fragment.getPlanRoot()); + if (!(leftMostNode instanceof ScanNode) && !hasUnionNode) { // there is no leftmost scan; we assign the same hosts as those of our // leftmost input fragment (so that a partitioned aggregation // fragment runs on the hosts that provide the input data) PlanFragmentId inputFragmentIdx = - fragments.get(i).getChild(0).getFragmentId(); + fragments.get(i).getChild(0).getFragmentId(); // AddAll() soft copy() - for (FInstanceExecParam execParams - : fragmentExecParamsMap.get(inputFragmentIdx).instanceExecParams) { - FInstanceExecParam instanceParam = new FInstanceExecParam(null, execParams.host, - 0, params); - params.instanceExecParams.add(instanceParam); + int doris_exchange_instances= -1; + if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable() != null) { + doris_exchange_instances = ConnectContext.get().getSessionVariable().getDorisExchangeInstances(); + } + if (doris_exchange_instances > 0 && fragmentExecParamsMap.get(inputFragmentIdx).instanceExecParams.size() > doris_exchange_instances) { + // random select some instance + List hosts = Lists.newArrayList(); + Set cache = new HashSet(); + for (FInstanceExecParam execParams: fragmentExecParamsMap.get(inputFragmentIdx).instanceExecParams) { + String hostPort = execParams.host.getHostname() + execParams.host.getPort(); + if (!cache.contains(hostPort)) { + hosts.add(execParams.host); + cache.add(hostPort); + } + } + + for (int index = 0; index < doris_exchange_instances; index++) { + FInstanceExecParam instanceParam = new FInstanceExecParam(null, hosts.get(index % hosts.size()),0, params); + params.instanceExecParams.add(instanceParam); + } + } else { + for (FInstanceExecParam execParams: fragmentExecParamsMap.get(inputFragmentIdx).instanceExecParams) { + FInstanceExecParam instanceParam = new FInstanceExecParam(null, execParams.host,0, params); + params.instanceExecParams.add(instanceParam); + } } // When group by cardinality is smaller than number of backend, only some backends always @@ -1159,7 +1180,7 @@ public void updateFragmentExecStatus(TReportExecStatusParams params) { updateCommitInfos(params.getCommitInfos()); } profileDoneSignal.markedCountDown(params.getFragment_instance_id(), -1L); - } + } if (params.isSetLoaded_rows()) { Catalog.getCurrentCatalog().getLoadManager().updateJobLoadedRows(jobId, params.query_id, params.loaded_rows); @@ -1310,7 +1331,7 @@ protected class FragmentExecParams { public List inputFragments = Lists.newArrayList(); public List instanceExecParams = Lists.newArrayList(); public FragmentScanRangeAssignment scanRangeAssignment = new FragmentScanRangeAssignment(); - + public FragmentExecParams(PlanFragment fragment) { this.fragment = fragment; } diff --git a/fe/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/src/main/java/org/apache/doris/qe/SessionVariable.java index 7463a524c35439..8c3b422c474efe 100644 --- a/fe/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -73,6 +73,8 @@ public class SessionVariable implements Serializable, Writable { public static final int MAX_EXEC_INSTANCE_NUM = 32; // if set to true, some of stmt will be forwarded to master FE to get result public static final String FORWARD_TO_MASTER = "forward_to_master"; + // user can set instance num after exchange, no need to be equal to nums of before exchange + public static final String DORIS_SHUFFLE_PARTITIONS= "doris_exchange_instances"; // max memory used on every backend. @VariableMgr.VarAttr(name = EXEC_MEM_LIMIT) @@ -158,6 +160,9 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = TIME_ZONE) private String timeZone = "CST"; + @VariableMgr.VarAttr(name = DORIS_SHUFFLE_PARTITIONS) + private int dorisExchangeInstances = -1; + // The current time zone @VariableMgr.VarAttr(name = SQL_SAFE_UPDATES) private int sqlSafeUpdates = 0; @@ -428,6 +433,10 @@ public int getParallelExecInstanceNum() { return parallelExecInstanceNum; } + public int getDorisExchangeInstances() { + return dorisExchangeInstances; + } + public void setParallelExecInstanceNum(int parallelExecInstanceNum) { if (parallelExecInstanceNum < MIN_EXEC_INSTANCE_NUM) { this.parallelExecInstanceNum = MIN_EXEC_INSTANCE_NUM; @@ -441,7 +450,7 @@ public void setParallelExecInstanceNum(int parallelExecInstanceNum) { public boolean getEnableInsertStrict() { return enableInsertStrict; } public void setEnableInsertStrict(boolean enableInsertStrict) { this.enableInsertStrict = enableInsertStrict; } - + // Serialize to thrift object public boolean getForwardToMaster() { return forwardToMaster; @@ -456,13 +465,13 @@ public void setForwardToMaster(boolean forwardToMaster) { public TQueryOptions toThrift() { TQueryOptions tResult = new TQueryOptions(); tResult.setMem_limit(maxExecMemByte); - + // TODO chenhao, reservation will be calculated by cost tResult.setMin_reservation(0); tResult.setMax_reservation(maxExecMemByte); - tResult.setInitial_reservation_total_claims(maxExecMemByte); + tResult.setInitial_reservation_total_claims(maxExecMemByte); tResult.setBuffer_pool_limit(maxExecMemByte); - + tResult.setQuery_timeout(queryTimeoutS); tResult.setIs_report_success(isReportSucc); tResult.setCodegen_level(codegenLevel); @@ -502,8 +511,9 @@ public void write(DataOutput out) throws IOException { out.writeLong(maxExecMemByte); Text.writeString(out, collationServer); out.writeInt(batchSize); - out.writeBoolean(disableStreamPreaggregations); + out.writeBoolean(disableStreamPreaggregations); out.writeInt(parallelExecInstanceNum); + out.writeInt(dorisExchangeInstances); } @Override @@ -542,5 +552,9 @@ public void readFields(DataInput in) throws IOException { disableStreamPreaggregations = in.readBoolean(); parallelExecInstanceNum = in.readInt(); } + if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.Version_61) { + dorisExchangeInstances = in.readInt(); + } + } } From f29e753a650821cef1d9fa125901f3dd10bce274 Mon Sep 17 00:00:00 2001 From: xionglei0 <55084968+xionglei0@users.noreply.github.com> Date: Thu, 12 Sep 2019 11:09:08 +0800 Subject: [PATCH 02/22] Update fe/src/main/java/org/apache/doris/common/FeMetaVersion.java Co-Authored-By: ZHAO Chun --- fe/src/main/java/org/apache/doris/common/FeMetaVersion.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java b/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java index 4c3fec434bb9dc..fb5db92409b02e 100644 --- a/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java +++ b/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java @@ -131,5 +131,5 @@ public final class FeMetaVersion { // refactor date literal public static final int VERSION_60 = 60; // add param: doris_shuffle_partitions - public static final int Version_61 = 61; + public static final int VERSION_61 = 61; } From 7503f7441e7fe38f053be4934e8169ef8d84913b Mon Sep 17 00:00:00 2001 From: xionglei0 <55084968+xionglei0@users.noreply.github.com> Date: Thu, 12 Sep 2019 11:09:23 +0800 Subject: [PATCH 03/22] Update fe/src/main/java/org/apache/doris/qe/SessionVariable.java Co-Authored-By: ZHAO Chun --- fe/src/main/java/org/apache/doris/qe/SessionVariable.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fe/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/src/main/java/org/apache/doris/qe/SessionVariable.java index 8c3b422c474efe..663a1a41cfbd35 100644 --- a/fe/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -74,7 +74,7 @@ public class SessionVariable implements Serializable, Writable { // if set to true, some of stmt will be forwarded to master FE to get result public static final String FORWARD_TO_MASTER = "forward_to_master"; // user can set instance num after exchange, no need to be equal to nums of before exchange - public static final String DORIS_SHUFFLE_PARTITIONS= "doris_exchange_instances"; + public static final String DORIS_SHUFFLE_PARTITIONS = "doris_exchange_instances"; // max memory used on every backend. @VariableMgr.VarAttr(name = EXEC_MEM_LIMIT) From c4780f5d3c7bad888eacb890bc5e573cb30ffb33 Mon Sep 17 00:00:00 2001 From: xionglei1 Date: Thu, 12 Sep 2019 11:26:09 +0800 Subject: [PATCH 04/22] shuffle hosts --- fe/src/main/java/org/apache/doris/qe/Coordinator.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/fe/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/src/main/java/org/apache/doris/qe/Coordinator.java index bac7008693478c..17e8b77d22cb2e 100644 --- a/fe/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/src/main/java/org/apache/doris/qe/Coordinator.java @@ -871,7 +871,9 @@ private void computeFragmentHosts() throws Exception { cache.add(hostPort); } } - + + // when doris_exchange_instances > hosts size, single host may execute severval instances + Collections.shuffle(hosts, instanceRandom); for (int index = 0; index < doris_exchange_instances; index++) { FInstanceExecParam instanceParam = new FInstanceExecParam(null, hosts.get(index % hosts.size()),0, params); params.instanceExecParams.add(instanceParam); From 75b76dd015474d9d0b08c57aeae83b278e06dda1 Mon Sep 17 00:00:00 2001 From: xionglei1 Date: Thu, 12 Sep 2019 11:31:10 +0800 Subject: [PATCH 05/22] fix Version_61 to VERSION_61 --- fe/src/main/java/org/apache/doris/qe/SessionVariable.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fe/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/src/main/java/org/apache/doris/qe/SessionVariable.java index 663a1a41cfbd35..8edd7c12e7732f 100644 --- a/fe/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -552,7 +552,7 @@ public void readFields(DataInput in) throws IOException { disableStreamPreaggregations = in.readBoolean(); parallelExecInstanceNum = in.readInt(); } - if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.Version_61) { + if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_61) { dorisExchangeInstances = in.readInt(); } From d596f575da515afb42693dfa12cc344d660e9572 Mon Sep 17 00:00:00 2001 From: xionglei1 Date: Thu, 12 Sep 2019 15:04:20 +0800 Subject: [PATCH 06/22] add comment --- fe/src/main/java/org/apache/doris/qe/Coordinator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fe/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/src/main/java/org/apache/doris/qe/Coordinator.java index 17e8b77d22cb2e..c5b16b64f9ba84 100644 --- a/fe/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/src/main/java/org/apache/doris/qe/Coordinator.java @@ -862,6 +862,7 @@ private void computeFragmentHosts() throws Exception { } if (doris_exchange_instances > 0 && fragmentExecParamsMap.get(inputFragmentIdx).instanceExecParams.size() > doris_exchange_instances) { // random select some instance + // get distinct host, when parallel_fragment_exec_instance_num > 1, single host may execute severval instances List hosts = Lists.newArrayList(); Set cache = new HashSet(); for (FInstanceExecParam execParams: fragmentExecParamsMap.get(inputFragmentIdx).instanceExecParams) { @@ -872,7 +873,6 @@ private void computeFragmentHosts() throws Exception { } } - // when doris_exchange_instances > hosts size, single host may execute severval instances Collections.shuffle(hosts, instanceRandom); for (int index = 0; index < doris_exchange_instances; index++) { FInstanceExecParam instanceParam = new FInstanceExecParam(null, hosts.get(index % hosts.size()),0, params); From ba7f627262349828c6470d266d4edf5eafb5541f Mon Sep 17 00:00:00 2001 From: xionglei1 Date: Thu, 12 Sep 2019 15:10:24 +0800 Subject: [PATCH 07/22] remove tab --- fe/src/main/java/org/apache/doris/qe/Coordinator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fe/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/src/main/java/org/apache/doris/qe/Coordinator.java index c5b16b64f9ba84..5da2578257d8b2 100644 --- a/fe/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/src/main/java/org/apache/doris/qe/Coordinator.java @@ -873,7 +873,7 @@ private void computeFragmentHosts() throws Exception { } } - Collections.shuffle(hosts, instanceRandom); + Collections.shuffle(hosts, instanceRandom); for (int index = 0; index < doris_exchange_instances; index++) { FInstanceExecParam instanceParam = new FInstanceExecParam(null, hosts.get(index % hosts.size()),0, params); params.instanceExecParams.add(instanceParam); From ba682fa5a8470a4959729787d53e4a0fd7d07348 Mon Sep 17 00:00:00 2001 From: xionglei1 Date: Thu, 12 Sep 2019 17:01:10 +0800 Subject: [PATCH 08/22] resolve conflics --- fe/src/main/java/org/apache/doris/common/FeMetaVersion.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java b/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java index b7810d1b1da9c4..f46581e3004b7b 100644 --- a/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java +++ b/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java @@ -133,5 +133,5 @@ public final class FeMetaVersion { // for alter job v2 public static final int VERSION_61 = 61; // add param: doris_shuffle_partitions - public static final int VERSION_61 = 62; + public static final int VERSION_62 = 62; } From 6dd4422fbe9d78e2eba0ecd438191d5c15e84cbc Mon Sep 17 00:00:00 2001 From: xionglei0 <55084968+xionglei0@users.noreply.github.com> Date: Thu, 12 Sep 2019 17:28:32 +0800 Subject: [PATCH 09/22] Update fe/src/main/java/org/apache/doris/qe/Coordinator.java Co-Authored-By: ZHAO Chun --- fe/src/main/java/org/apache/doris/qe/Coordinator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fe/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/src/main/java/org/apache/doris/qe/Coordinator.java index 3ab45812ecac39..cd85e8c4a5c1e2 100644 --- a/fe/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/src/main/java/org/apache/doris/qe/Coordinator.java @@ -856,7 +856,7 @@ private void computeFragmentHosts() throws Exception { PlanFragmentId inputFragmentIdx = fragments.get(i).getChild(0).getFragmentId(); // AddAll() soft copy() - int doris_exchange_instances= -1; + int doris_exchange_instances = -1; if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable() != null) { doris_exchange_instances = ConnectContext.get().getSessionVariable().getDorisExchangeInstances(); } From 27479b76c3945753556699e647493a4c5db965ac Mon Sep 17 00:00:00 2001 From: xionglei0 <55084968+xionglei0@users.noreply.github.com> Date: Thu, 12 Sep 2019 17:28:46 +0800 Subject: [PATCH 10/22] Update fe/src/main/java/org/apache/doris/qe/Coordinator.java Co-Authored-By: ZHAO Chun --- fe/src/main/java/org/apache/doris/qe/Coordinator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fe/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/src/main/java/org/apache/doris/qe/Coordinator.java index cd85e8c4a5c1e2..c22b766f705aa2 100644 --- a/fe/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/src/main/java/org/apache/doris/qe/Coordinator.java @@ -875,7 +875,7 @@ private void computeFragmentHosts() throws Exception { Collections.shuffle(hosts, instanceRandom); for (int index = 0; index < doris_exchange_instances; index++) { - FInstanceExecParam instanceParam = new FInstanceExecParam(null, hosts.get(index % hosts.size()),0, params); + FInstanceExecParam instanceParam = new FInstanceExecParam(null, hosts.get(index % hosts.size()), 0, params); params.instanceExecParams.add(instanceParam); } } else { From f50333488f4348fd1336ebf9f98f6542643a2c1b Mon Sep 17 00:00:00 2001 From: xionglei1 Date: Thu, 12 Sep 2019 18:14:39 +0800 Subject: [PATCH 11/22] fix --- .../main/java/org/apache/doris/qe/Coordinator.java | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/fe/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/src/main/java/org/apache/doris/qe/Coordinator.java index 3ab45812ecac39..d5e044e610e3fd 100644 --- a/fe/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/src/main/java/org/apache/doris/qe/Coordinator.java @@ -863,19 +863,15 @@ private void computeFragmentHosts() throws Exception { if (doris_exchange_instances > 0 && fragmentExecParamsMap.get(inputFragmentIdx).instanceExecParams.size() > doris_exchange_instances) { // random select some instance // get distinct host, when parallel_fragment_exec_instance_num > 1, single host may execute severval instances - List hosts = Lists.newArrayList(); - Set cache = new HashSet(); + Set cache = new HashSet(); for (FInstanceExecParam execParams: fragmentExecParamsMap.get(inputFragmentIdx).instanceExecParams) { - String hostPort = execParams.host.getHostname() + execParams.host.getPort(); - if (!cache.contains(hostPort)) { - hosts.add(execParams.host); - cache.add(hostPort); + if (!cache.contains(execParams.host)) { + cache.add(execParams.host); } } - - Collections.shuffle(hosts, instanceRandom); + TNetworkAddress[] hosts = (TNetworkAddress[])cache.toArray(); for (int index = 0; index < doris_exchange_instances; index++) { - FInstanceExecParam instanceParam = new FInstanceExecParam(null, hosts.get(index % hosts.size()),0, params); + FInstanceExecParam instanceParam = new FInstanceExecParam(null, hosts[index % hosts.length], 0, params); params.instanceExecParams.add(instanceParam); } } else { From fefff1b11ddf056801a600c9dec2789449a34e78 Mon Sep 17 00:00:00 2001 From: xionglei1 Date: Thu, 12 Sep 2019 18:19:52 +0800 Subject: [PATCH 12/22] rm tab --- fe/src/main/java/org/apache/doris/qe/Coordinator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fe/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/src/main/java/org/apache/doris/qe/Coordinator.java index e4323b67ef8ae4..9d04114b3e47ae 100644 --- a/fe/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/src/main/java/org/apache/doris/qe/Coordinator.java @@ -869,7 +869,7 @@ private void computeFragmentHosts() throws Exception { cache.add(execParams.host); } } - TNetworkAddress[] hosts = (TNetworkAddress[])cache.toArray(); + TNetworkAddress[] hosts = (TNetworkAddress[])cache.toArray(); for (int index = 0; index < doris_exchange_instances; index++) { FInstanceExecParam instanceParam = new FInstanceExecParam(null, hosts[index % hosts.length], 0, params); params.instanceExecParams.add(instanceParam); From 0f800757e231dde48536da7a8af7443bb7afe082 Mon Sep 17 00:00:00 2001 From: xionglei1 Date: Thu, 12 Sep 2019 19:02:48 +0800 Subject: [PATCH 13/22] add shuffle --- fe/src/main/java/org/apache/doris/qe/Coordinator.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/fe/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/src/main/java/org/apache/doris/qe/Coordinator.java index 9d04114b3e47ae..53c585c6ff145e 100644 --- a/fe/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/src/main/java/org/apache/doris/qe/Coordinator.java @@ -869,9 +869,11 @@ private void computeFragmentHosts() throws Exception { cache.add(execParams.host); } } - TNetworkAddress[] hosts = (TNetworkAddress[])cache.toArray(); + List hosts = new ArrayList(); + hosts.addAll(cache); + Collections.shuffle(hosts, instanceRandom); for (int index = 0; index < doris_exchange_instances; index++) { - FInstanceExecParam instanceParam = new FInstanceExecParam(null, hosts[index % hosts.length], 0, params); + FInstanceExecParam instanceParam = new FInstanceExecParam(null, hosts.get(index % hosts.size()), 0, params); params.instanceExecParams.add(instanceParam); } } else { From 7befafa43b03f305a94ad6043a356641e7b8d0e1 Mon Sep 17 00:00:00 2001 From: xionglei1 Date: Thu, 12 Sep 2019 20:29:52 +0800 Subject: [PATCH 14/22] reaname cache --- fe/src/main/java/org/apache/doris/qe/Coordinator.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/fe/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/src/main/java/org/apache/doris/qe/Coordinator.java index 53c585c6ff145e..faaa3ddb114672 100644 --- a/fe/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/src/main/java/org/apache/doris/qe/Coordinator.java @@ -863,14 +863,14 @@ private void computeFragmentHosts() throws Exception { if (doris_exchange_instances > 0 && fragmentExecParamsMap.get(inputFragmentIdx).instanceExecParams.size() > doris_exchange_instances) { // random select some instance // get distinct host, when parallel_fragment_exec_instance_num > 1, single host may execute severval instances - Set cache = new HashSet(); + Set addresses = new HashSet(); for (FInstanceExecParam execParams: fragmentExecParamsMap.get(inputFragmentIdx).instanceExecParams) { - if (!cache.contains(execParams.host)) { - cache.add(execParams.host); + if (!addresses.contains(execParams.host)) { + addresses.add(execParams.host); } } List hosts = new ArrayList(); - hosts.addAll(cache); + hosts.addAll(addresses); Collections.shuffle(hosts, instanceRandom); for (int index = 0; index < doris_exchange_instances; index++) { FInstanceExecParam instanceParam = new FInstanceExecParam(null, hosts.get(index % hosts.size()), 0, params); From 27e8b0b56a03e72c6631004215670aaf77578186 Mon Sep 17 00:00:00 2001 From: xionglei1 Date: Thu, 12 Sep 2019 14:16:34 +0000 Subject: [PATCH 15/22] rename variables --- .../main/java/org/apache/doris/qe/Coordinator.java | 8 ++++---- .../java/org/apache/doris/qe/SessionVariable.java | 14 +++++++------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/fe/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/src/main/java/org/apache/doris/qe/Coordinator.java index faaa3ddb114672..a2ca6d0f1f89df 100644 --- a/fe/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/src/main/java/org/apache/doris/qe/Coordinator.java @@ -856,11 +856,11 @@ private void computeFragmentHosts() throws Exception { PlanFragmentId inputFragmentIdx = fragments.get(i).getChild(0).getFragmentId(); // AddAll() soft copy() - int doris_exchange_instances = -1; + int exchangeInstances = -1; if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable() != null) { - doris_exchange_instances = ConnectContext.get().getSessionVariable().getDorisExchangeInstances(); + exchangeInstances = ConnectContext.get().getSessionVariable().getExchangeInstanceParallel(); } - if (doris_exchange_instances > 0 && fragmentExecParamsMap.get(inputFragmentIdx).instanceExecParams.size() > doris_exchange_instances) { + if (exchangeInstances > 0 && fragmentExecParamsMap.get(inputFragmentIdx).instanceExecParams.size() > exchangeInstances) { // random select some instance // get distinct host, when parallel_fragment_exec_instance_num > 1, single host may execute severval instances Set addresses = new HashSet(); @@ -872,7 +872,7 @@ private void computeFragmentHosts() throws Exception { List hosts = new ArrayList(); hosts.addAll(addresses); Collections.shuffle(hosts, instanceRandom); - for (int index = 0; index < doris_exchange_instances; index++) { + for (int index = 0; index < exchangeInstances; index++) { FInstanceExecParam instanceParam = new FInstanceExecParam(null, hosts.get(index % hosts.size()), 0, params); params.instanceExecParams.add(instanceParam); } diff --git a/fe/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/src/main/java/org/apache/doris/qe/SessionVariable.java index 79c47f1c0eb0bd..c15f1a9b65a08f 100644 --- a/fe/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -74,7 +74,7 @@ public class SessionVariable implements Serializable, Writable { // if set to true, some of stmt will be forwarded to master FE to get result public static final String FORWARD_TO_MASTER = "forward_to_master"; // user can set instance num after exchange, no need to be equal to nums of before exchange - public static final String DORIS_SHUFFLE_PARTITIONS = "doris_exchange_instances"; + public static final String EXCHANGE_INSTANCE_PARALLEL = "exchange_instance_parallel"; // max memory used on every backend. @VariableMgr.VarAttr(name = EXEC_MEM_LIMIT) @@ -160,8 +160,8 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = TIME_ZONE) private String timeZone = "CST"; - @VariableMgr.VarAttr(name = DORIS_SHUFFLE_PARTITIONS) - private int dorisExchangeInstances = -1; + @VariableMgr.VarAttr(name = EXCHANGE_INSTANCE_PARALLEL) + private int exchangeInstanceParallel = -1; // The current time zone @VariableMgr.VarAttr(name = SQL_SAFE_UPDATES) @@ -433,8 +433,8 @@ public int getParallelExecInstanceNum() { return parallelExecInstanceNum; } - public int getDorisExchangeInstances() { - return dorisExchangeInstances; + public int getExchangeInstanceParallel() { + return exchangeInstanceParallel; } public void setParallelExecInstanceNum(int parallelExecInstanceNum) { @@ -513,7 +513,7 @@ public void write(DataOutput out) throws IOException { out.writeInt(batchSize); out.writeBoolean(disableStreamPreaggregations); out.writeInt(parallelExecInstanceNum); - out.writeInt(dorisExchangeInstances); + out.writeInt(exchangeInstanceParallel); } @Override @@ -553,7 +553,7 @@ public void readFields(DataInput in) throws IOException { parallelExecInstanceNum = in.readInt(); } if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_62) { - dorisExchangeInstances = in.readInt(); + exchangeInstanceParallel = in.readInt(); } } From b25651d54d02d5655880dc30ae19c585d61a535f Mon Sep 17 00:00:00 2001 From: xionglei0 <55084968+xionglei0@users.noreply.github.com> Date: Sat, 14 Sep 2019 14:59:50 +0800 Subject: [PATCH 16/22] Update fe/src/main/java/org/apache/doris/qe/SessionVariable.java Co-Authored-By: ZHAO Chun --- fe/src/main/java/org/apache/doris/qe/SessionVariable.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fe/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/src/main/java/org/apache/doris/qe/SessionVariable.java index c15f1a9b65a08f..feae5bf3aad250 100644 --- a/fe/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -74,7 +74,7 @@ public class SessionVariable implements Serializable, Writable { // if set to true, some of stmt will be forwarded to master FE to get result public static final String FORWARD_TO_MASTER = "forward_to_master"; // user can set instance num after exchange, no need to be equal to nums of before exchange - public static final String EXCHANGE_INSTANCE_PARALLEL = "exchange_instance_parallel"; + public static final String PARALLEL_EXCHANGE_INSTANCE_NUM = "parallel_exchange_instance_num"; // max memory used on every backend. @VariableMgr.VarAttr(name = EXEC_MEM_LIMIT) From 8a2123beb7ee05710c49b4b926b41214da48fdf2 Mon Sep 17 00:00:00 2001 From: xionglei0 <55084968+xionglei0@users.noreply.github.com> Date: Sat, 14 Sep 2019 15:00:22 +0800 Subject: [PATCH 17/22] Update fe/src/main/java/org/apache/doris/qe/Coordinator.java Co-Authored-By: ZHAO Chun --- fe/src/main/java/org/apache/doris/qe/Coordinator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fe/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/src/main/java/org/apache/doris/qe/Coordinator.java index a2ca6d0f1f89df..0661c2e1b8e294 100644 --- a/fe/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/src/main/java/org/apache/doris/qe/Coordinator.java @@ -863,7 +863,7 @@ private void computeFragmentHosts() throws Exception { if (exchangeInstances > 0 && fragmentExecParamsMap.get(inputFragmentIdx).instanceExecParams.size() > exchangeInstances) { // random select some instance // get distinct host, when parallel_fragment_exec_instance_num > 1, single host may execute severval instances - Set addresses = new HashSet(); + Set hostSet = Sets.newHashSet(); for (FInstanceExecParam execParams: fragmentExecParamsMap.get(inputFragmentIdx).instanceExecParams) { if (!addresses.contains(execParams.host)) { addresses.add(execParams.host); From 916dce93d3504b22a7ea48c59681904b7ec67773 Mon Sep 17 00:00:00 2001 From: xionglei0 <55084968+xionglei0@users.noreply.github.com> Date: Sat, 14 Sep 2019 15:00:50 +0800 Subject: [PATCH 18/22] Update fe/src/main/java/org/apache/doris/qe/Coordinator.java Co-Authored-By: ZHAO Chun --- fe/src/main/java/org/apache/doris/qe/Coordinator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fe/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/src/main/java/org/apache/doris/qe/Coordinator.java index 0661c2e1b8e294..d7896a63db181a 100644 --- a/fe/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/src/main/java/org/apache/doris/qe/Coordinator.java @@ -869,7 +869,7 @@ private void computeFragmentHosts() throws Exception { addresses.add(execParams.host); } } - List hosts = new ArrayList(); + List hosts = Lists.newArrayList(hostSet); hosts.addAll(addresses); Collections.shuffle(hosts, instanceRandom); for (int index = 0; index < exchangeInstances; index++) { From c3fb0e3fcae3d8b4a294fb9e2780c727ad55ade5 Mon Sep 17 00:00:00 2001 From: xionglei1 Date: Sat, 14 Sep 2019 15:46:04 +0800 Subject: [PATCH 19/22] rename variables --- fe/src/main/java/org/apache/doris/qe/Coordinator.java | 6 +++--- fe/src/main/java/org/apache/doris/qe/SessionVariable.java | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/fe/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/src/main/java/org/apache/doris/qe/Coordinator.java index d7896a63db181a..46168383595298 100644 --- a/fe/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/src/main/java/org/apache/doris/qe/Coordinator.java @@ -865,12 +865,12 @@ private void computeFragmentHosts() throws Exception { // get distinct host, when parallel_fragment_exec_instance_num > 1, single host may execute severval instances Set hostSet = Sets.newHashSet(); for (FInstanceExecParam execParams: fragmentExecParamsMap.get(inputFragmentIdx).instanceExecParams) { - if (!addresses.contains(execParams.host)) { - addresses.add(execParams.host); + if (!hostSet.contains(execParams.host)) { + hostSet.add(execParams.host); } } List hosts = Lists.newArrayList(hostSet); - hosts.addAll(addresses); + hosts.addAll(hostSet); Collections.shuffle(hosts, instanceRandom); for (int index = 0; index < exchangeInstances; index++) { FInstanceExecParam instanceParam = new FInstanceExecParam(null, hosts.get(index % hosts.size()), 0, params); diff --git a/fe/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/src/main/java/org/apache/doris/qe/SessionVariable.java index feae5bf3aad250..d33e3159882097 100644 --- a/fe/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -160,7 +160,7 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = TIME_ZONE) private String timeZone = "CST"; - @VariableMgr.VarAttr(name = EXCHANGE_INSTANCE_PARALLEL) + @VariableMgr.VarAttr(name = PARALLEL_EXCHANGE_INSTANCE_NUM) private int exchangeInstanceParallel = -1; // The current time zone From a27a0f0e3e7d5bf2744d647da3c25c25196344e7 Mon Sep 17 00:00:00 2001 From: xionglei1 Date: Mon, 16 Sep 2019 08:26:00 +0800 Subject: [PATCH 20/22] rm addAll --- fe/src/main/java/org/apache/doris/qe/Coordinator.java | 1 - 1 file changed, 1 deletion(-) diff --git a/fe/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/src/main/java/org/apache/doris/qe/Coordinator.java index 46168383595298..915e657c22373f 100644 --- a/fe/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/src/main/java/org/apache/doris/qe/Coordinator.java @@ -870,7 +870,6 @@ private void computeFragmentHosts() throws Exception { } } List hosts = Lists.newArrayList(hostSet); - hosts.addAll(hostSet); Collections.shuffle(hosts, instanceRandom); for (int index = 0; index < exchangeInstances; index++) { FInstanceExecParam instanceParam = new FInstanceExecParam(null, hosts.get(index % hosts.size()), 0, params); From d94666420d3e264bb990ee69bc5fcdc242c87a2f Mon Sep 17 00:00:00 2001 From: xionglei0 <55084968+xionglei0@users.noreply.github.com> Date: Mon, 16 Sep 2019 13:56:46 +0800 Subject: [PATCH 21/22] Update fe/src/main/java/org/apache/doris/qe/Coordinator.java Co-Authored-By: ZHAO Chun --- fe/src/main/java/org/apache/doris/qe/Coordinator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fe/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/src/main/java/org/apache/doris/qe/Coordinator.java index 915e657c22373f..1a2d252c6ac56c 100644 --- a/fe/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/src/main/java/org/apache/doris/qe/Coordinator.java @@ -877,7 +877,7 @@ private void computeFragmentHosts() throws Exception { } } else { for (FInstanceExecParam execParams: fragmentExecParamsMap.get(inputFragmentIdx).instanceExecParams) { - FInstanceExecParam instanceParam = new FInstanceExecParam(null, execParams.host,0, params); + FInstanceExecParam instanceParam = new FInstanceExecParam(null, execParams.host, 0, params); params.instanceExecParams.add(instanceParam); } } From e57d98dd88314ef8a7c7f95701542d935824aab0 Mon Sep 17 00:00:00 2001 From: xionglei1 Date: Mon, 16 Sep 2019 14:06:53 +0800 Subject: [PATCH 22/22] add host without judge --- fe/src/main/java/org/apache/doris/qe/Coordinator.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/fe/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/src/main/java/org/apache/doris/qe/Coordinator.java index 1a2d252c6ac56c..46807207a23a00 100644 --- a/fe/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/src/main/java/org/apache/doris/qe/Coordinator.java @@ -865,9 +865,7 @@ private void computeFragmentHosts() throws Exception { // get distinct host, when parallel_fragment_exec_instance_num > 1, single host may execute severval instances Set hostSet = Sets.newHashSet(); for (FInstanceExecParam execParams: fragmentExecParamsMap.get(inputFragmentIdx).instanceExecParams) { - if (!hostSet.contains(execParams.host)) { - hostSet.add(execParams.host); - } + hostSet.add(execParams.host); } List hosts = Lists.newArrayList(hostSet); Collections.shuffle(hosts, instanceRandom);