Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
3733db6
add param: doris_exchange_instances to set parallel after exchange
Sep 11, 2019
f29e753
Update fe/src/main/java/org/apache/doris/common/FeMetaVersion.java
xionglei0 Sep 12, 2019
7503f74
Update fe/src/main/java/org/apache/doris/qe/SessionVariable.java
xionglei0 Sep 12, 2019
c4780f5
shuffle hosts
Sep 12, 2019
75b76dd
fix Version_61 to VERSION_61
Sep 12, 2019
d596f57
add comment
Sep 12, 2019
ba7f627
remove tab
Sep 12, 2019
e3d2a2d
resolve confics
Sep 12, 2019
ba682fa
resolve conflics
Sep 12, 2019
6dd4422
Update fe/src/main/java/org/apache/doris/qe/Coordinator.java
xionglei0 Sep 12, 2019
27479b7
Update fe/src/main/java/org/apache/doris/qe/Coordinator.java
xionglei0 Sep 12, 2019
f503334
fix
Sep 12, 2019
d5036c2
fix
Sep 12, 2019
fefff1b
rm tab
Sep 12, 2019
0f80075
add shuffle
Sep 12, 2019
7befafa
reaname cache
Sep 12, 2019
27e8b0b
rename variables
Sep 12, 2019
b25651d
Update fe/src/main/java/org/apache/doris/qe/SessionVariable.java
xionglei0 Sep 14, 2019
8a2123b
Update fe/src/main/java/org/apache/doris/qe/Coordinator.java
xionglei0 Sep 14, 2019
916dce9
Update fe/src/main/java/org/apache/doris/qe/Coordinator.java
xionglei0 Sep 14, 2019
10ece7a
Merge branch 'master' into doris_exchange_instances
Sep 14, 2019
54fd998
Merge branch 'doris_exchange_instances' of https://github.com/xiongle…
Sep 14, 2019
c3fb0e3
rename variables
Sep 14, 2019
a27a0f0
rm addAll
Sep 16, 2019
d946664
Update fe/src/main/java/org/apache/doris/qe/Coordinator.java
xionglei0 Sep 16, 2019
e57d98d
add host without judge
Sep 16, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions fe/src/main/java/org/apache/doris/common/FeMetaVersion.java
Original file line number Diff line number Diff line change
Expand Up @@ -132,4 +132,6 @@ public final class FeMetaVersion {
public static final int VERSION_60 = 60;
// for alter job v2
public static final int VERSION_61 = 61;
// add param: doris_shuffle_partitions
public static final int VERSION_62 = 62;
}
38 changes: 28 additions & 10 deletions fe/src/main/java/org/apache/doris/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -201,9 +201,9 @@ public Coordinator(ConnectContext context, Analyzer analyzer, Planner planner) {
this.queryGlobals.setNow_string(DATE_FORMAT.format(new Date()));
this.queryGlobals.setTimestamp_ms(new Date().getTime());
if (context.getSessionVariable().getTimeZone().equals("CST")) {
this.queryGlobals.setTime_zone(TimeUtils.DEFAULT_TIME_ZONE);
this.queryGlobals.setTime_zone(TimeUtils.DEFAULT_TIME_ZONE);
} else {
this.queryGlobals.setTime_zone(context.getSessionVariable().getTimeZone());
this.queryGlobals.setTime_zone(context.getSessionVariable().getTimeZone());
}
this.tResourceInfo = new TResourceInfo(context.getQualifiedUser(),
context.getSessionVariable().getResourceGroup());
Expand Down Expand Up @@ -848,18 +848,36 @@ private void computeFragmentHosts() throws Exception {
// be BE that fragment's scannode locates, avoid less data.
// chenhao added
boolean hasUnionNode = containsUnionNode(fragment.getPlanRoot());

if (!(leftMostNode instanceof ScanNode) && !hasUnionNode) {
// there is no leftmost scan; we assign the same hosts as those of our
// leftmost input fragment (so that a partitioned aggregation
// fragment runs on the hosts that provide the input data)
PlanFragmentId inputFragmentIdx =
fragments.get(i).getChild(0).getFragmentId();
fragments.get(i).getChild(0).getFragmentId();
// AddAll() soft copy()
for (FInstanceExecParam execParams
: fragmentExecParamsMap.get(inputFragmentIdx).instanceExecParams) {
FInstanceExecParam instanceParam = new FInstanceExecParam(null, execParams.host,
0, params);
params.instanceExecParams.add(instanceParam);
int exchangeInstances = -1;
if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable() != null) {
exchangeInstances = ConnectContext.get().getSessionVariable().getExchangeInstanceParallel();
}
if (exchangeInstances > 0 && fragmentExecParamsMap.get(inputFragmentIdx).instanceExecParams.size() > exchangeInstances) {
// random select some instance
// get distinct host, when parallel_fragment_exec_instance_num > 1, single host may execute severval instances
Set<TNetworkAddress> hostSet = Sets.newHashSet();
for (FInstanceExecParam execParams: fragmentExecParamsMap.get(inputFragmentIdx).instanceExecParams) {
hostSet.add(execParams.host);
}
List<TNetworkAddress> hosts = Lists.newArrayList(hostSet);
Collections.shuffle(hosts, instanceRandom);
for (int index = 0; index < exchangeInstances; index++) {
FInstanceExecParam instanceParam = new FInstanceExecParam(null, hosts.get(index % hosts.size()), 0, params);
params.instanceExecParams.add(instanceParam);
}
} else {
for (FInstanceExecParam execParams: fragmentExecParamsMap.get(inputFragmentIdx).instanceExecParams) {
FInstanceExecParam instanceParam = new FInstanceExecParam(null, execParams.host, 0, params);
params.instanceExecParams.add(instanceParam);
}
}

// When group by cardinality is smaller than number of backend, only some backends always
Expand Down Expand Up @@ -1159,7 +1177,7 @@ public void updateFragmentExecStatus(TReportExecStatusParams params) {
updateCommitInfos(params.getCommitInfos());
}
profileDoneSignal.markedCountDown(params.getFragment_instance_id(), -1L);
}
}

if (params.isSetLoaded_rows()) {
Catalog.getCurrentCatalog().getLoadManager().updateJobLoadedRows(jobId, params.query_id, params.loaded_rows);
Expand Down Expand Up @@ -1310,7 +1328,7 @@ protected class FragmentExecParams {
public List<PlanFragmentId> inputFragments = Lists.newArrayList();
public List<FInstanceExecParam> instanceExecParams = Lists.newArrayList();
public FragmentScanRangeAssignment scanRangeAssignment = new FragmentScanRangeAssignment();

public FragmentExecParams(PlanFragment fragment) {
this.fragment = fragment;
}
Expand Down
24 changes: 19 additions & 5 deletions fe/src/main/java/org/apache/doris/qe/SessionVariable.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ public class SessionVariable implements Serializable, Writable {
public static final int MAX_EXEC_INSTANCE_NUM = 32;
// if set to true, some of stmt will be forwarded to master FE to get result
public static final String FORWARD_TO_MASTER = "forward_to_master";
// user can set instance num after exchange, no need to be equal to nums of before exchange
public static final String PARALLEL_EXCHANGE_INSTANCE_NUM = "parallel_exchange_instance_num";

// max memory used on every backend.
@VariableMgr.VarAttr(name = EXEC_MEM_LIMIT)
Expand Down Expand Up @@ -158,6 +160,9 @@ public class SessionVariable implements Serializable, Writable {
@VariableMgr.VarAttr(name = TIME_ZONE)
private String timeZone = "CST";

@VariableMgr.VarAttr(name = PARALLEL_EXCHANGE_INSTANCE_NUM)
private int exchangeInstanceParallel = -1;

// The current time zone
@VariableMgr.VarAttr(name = SQL_SAFE_UPDATES)
private int sqlSafeUpdates = 0;
Expand Down Expand Up @@ -428,6 +433,10 @@ public int getParallelExecInstanceNum() {
return parallelExecInstanceNum;
}

public int getExchangeInstanceParallel() {
return exchangeInstanceParallel;
}

public void setParallelExecInstanceNum(int parallelExecInstanceNum) {
if (parallelExecInstanceNum < MIN_EXEC_INSTANCE_NUM) {
this.parallelExecInstanceNum = MIN_EXEC_INSTANCE_NUM;
Expand All @@ -441,7 +450,7 @@ public void setParallelExecInstanceNum(int parallelExecInstanceNum) {
public boolean getEnableInsertStrict() { return enableInsertStrict; }
public void setEnableInsertStrict(boolean enableInsertStrict) { this.enableInsertStrict = enableInsertStrict; }


// Serialize to thrift object
public boolean getForwardToMaster() {
return forwardToMaster;
Expand All @@ -456,13 +465,13 @@ public void setForwardToMaster(boolean forwardToMaster) {
public TQueryOptions toThrift() {
TQueryOptions tResult = new TQueryOptions();
tResult.setMem_limit(maxExecMemByte);

// TODO chenhao, reservation will be calculated by cost
tResult.setMin_reservation(0);
tResult.setMax_reservation(maxExecMemByte);
tResult.setInitial_reservation_total_claims(maxExecMemByte);
tResult.setInitial_reservation_total_claims(maxExecMemByte);
tResult.setBuffer_pool_limit(maxExecMemByte);

tResult.setQuery_timeout(queryTimeoutS);
tResult.setIs_report_success(isReportSucc);
tResult.setCodegen_level(codegenLevel);
Expand Down Expand Up @@ -502,8 +511,9 @@ public void write(DataOutput out) throws IOException {
out.writeLong(maxExecMemByte);
Text.writeString(out, collationServer);
out.writeInt(batchSize);
out.writeBoolean(disableStreamPreaggregations);
out.writeBoolean(disableStreamPreaggregations);
out.writeInt(parallelExecInstanceNum);
out.writeInt(exchangeInstanceParallel);
}

@Override
Expand Down Expand Up @@ -542,5 +552,9 @@ public void readFields(DataInput in) throws IOException {
disableStreamPreaggregations = in.readBoolean();
parallelExecInstanceNum = in.readInt();
}
if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_62) {
exchangeInstanceParallel = in.readInt();
}

}
}