Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/en/administrator-guide/variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -338,3 +338,9 @@ SET forward_to_master = concat('tr', 'u', 'e');
* `rewrite_count_distinct_to_bitmap_hll`

Whether to rewrite count distinct queries of bitmap and HLL types as bitmap_union_count and hll_union_agg.

* `prefer_join_method`

When choosing the join method(broadcast join or shuffle join), if the broadcast join cost and shuffle join cost are equal, which join method should we prefer.

Currently, the optional values for this variable are "broadcast" or "shuffle".
5 changes: 5 additions & 0 deletions docs/zh-CN/administrator-guide/variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -338,3 +338,8 @@ SET forward_to_master = concat('tr', 'u', 'e');

是否将 bitmap 和 hll 类型的 count distinct 查询重写为 bitmap_union_count 和 hll_union_agg 。

* `prefer_join_method`

在选择join的具体实现方式是broadcast join还是shuffle join时,如果broadcast join cost和shuffle join cost相等时,优先选择哪种join方式。

目前该变量的可选值为"broadcast" 或者 "shuffle"。
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,20 @@ private PlanFragment createScanFragment(PlanNode node) {
}
}

/**
* When broadcastCost and partitionCost are equal, there is no uniform standard for which join implementation is better.
* Some scenarios are suitable for broadcast join, and some scenarios are suitable for shuffle join.
* Therefore, we add a SessionVariable to help users choose a better join implementation.
*/
private boolean isBroadcastCostSmaller(long broadcastCost, long partitionCost) {
String joinMethod = ConnectContext.get().getSessionVariable().getPreferJoinMethod();
if (joinMethod.equalsIgnoreCase("broadcast")) {
return broadcastCost <= partitionCost;
} else {
return broadcastCost < partitionCost;
}
}

/**
* Creates either a broadcast join or a repartitioning join, depending on the expected cost. If any of the inputs to
* the cost computation is unknown, it assumes the cost will be 0. Costs being equal, it'll favor partitioned over
Expand Down Expand Up @@ -351,7 +365,7 @@ private PlanFragment createHashJoinFragment(HashJoinNode node, PlanFragment righ
&& (perNodeMemLimit == 0 || Math.round(
(double) rhsDataSize * PlannerContext.HASH_TBL_SPACE_OVERHEAD) <= perNodeMemLimit)
&& (node.getInnerRef().isBroadcastJoin() || (!node.getInnerRef().isPartitionJoin()
&& broadcastCost < partitionCost))) {
&& isBroadcastCostSmaller(broadcastCost, partitionCost)))) {
doBroadcast = true;
} else {
doBroadcast = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public class SessionVariable implements Serializable, Writable {
public static final String PARALLEL_FRAGMENT_EXEC_INSTANCE_NUM = "parallel_fragment_exec_instance_num";
public static final String ENABLE_INSERT_STRICT = "enable_insert_strict";
public static final String ENABLE_SPILLING = "enable_spilling";
public static final String PREFER_JOIN_METHOD = "prefer_join_method";
public static final int MIN_EXEC_INSTANCE_NUM = 1;
public static final int MAX_EXEC_INSTANCE_NUM = 32;
// if set to true, some of stmt will be forwarded to master FE to get result
Expand Down Expand Up @@ -209,6 +210,9 @@ public class SessionVariable implements Serializable, Writable {
@VariableMgr.VarAttr(name = DISABLE_COLOCATE_JOIN)
private boolean disableColocateJoin = false;

@VariableMgr.VarAttr(name = PREFER_JOIN_METHOD)
private String preferJoinMethod = "broadcast";

/*
* the parallel exec instance num for one Fragment in one BE
* 1 means disable this feature
Expand Down Expand Up @@ -397,6 +401,10 @@ public boolean isDisableColocateJoin() {
return disableColocateJoin;
}

public String getPreferJoinMethod() {return preferJoinMethod; }

public void setPreferJoinMethod(String preferJoinMethod) {this.preferJoinMethod = preferJoinMethod; }

public int getParallelExecInstanceNum() {
return parallelExecInstanceNum;
}
Expand Down
6 changes: 6 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,12 @@ public static void setVar(SessionVariable sessionVariable, SetVar setVar) throws
}
}

if (setVar.getVariable().toLowerCase().equals("prefer_join_method")) {
if (!value.toLowerCase().equals("broadcast") && !value.toLowerCase().equals("shuffle")) {
ErrorReport.reportDdlException(ErrorCode.ERR_WRONG_VALUE_FOR_VAR, "prefer_join_method", value);
}
}

if (setVar.getType() == SetType.GLOBAL) {
wlock.lock();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -907,4 +907,23 @@ public void testJoinWithMysqlTable() throws Exception {
explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
Assert.assertFalse(explainString.contains("INNER JOIN (PARTITIONED)"));
}

@Test
public void testPreferBroadcastJoin() throws Exception {
connectContext.setDatabase("default_cluster:test");
String queryStr = "explain select * from (select k1 from jointest group by k1)t2, jointest t1 where t1.k1 = t2.k1";

// default set PreferBroadcastJoin true
String explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
Assert.assertTrue(explainString.contains("INNER JOIN (BROADCAST)"));

connectContext.getSessionVariable().setPreferJoinMethod("shuffle");
explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
Assert.assertTrue(explainString.contains("INNER JOIN (PARTITIONED)"));


connectContext.getSessionVariable().setPreferJoinMethod("broadcast");
explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
Assert.assertTrue(explainString.contains("INNER JOIN (BROADCAST)"));
}
}