From c7cfe7c4ac1bcfae74529e083941101421ee2dd6 Mon Sep 17 00:00:00 2001 From: wutiangan Date: Thu, 23 Jul 2020 10:57:28 +0800 Subject: [PATCH 1/4] [BUG] fix 4149, add sessionVariable to choose broadcastjoin first when cardinality cannot be estimated --- docs/en/administrator-guide/variables.md | 4 ++++ docs/zh-CN/administrator-guide/variables.md | 3 +++ .../doris/planner/DistributedPlanner.java | 15 ++++++++++++++- .../org/apache/doris/qe/SessionVariable.java | 8 ++++++++ .../apache/doris/planner/QueryPlanTest.java | 19 +++++++++++++++++++ 5 files changed, 48 insertions(+), 1 deletion(-) diff --git a/docs/en/administrator-guide/variables.md b/docs/en/administrator-guide/variables.md index 5091be45e016c9..5b7cc26fde28fd 100644 --- a/docs/en/administrator-guide/variables.md +++ b/docs/en/administrator-guide/variables.md @@ -338,3 +338,7 @@ SET forward_to_master = concat('tr', 'u', 'e'); * `rewrite_count_distinct_to_bitmap_hll` Whether to rewrite count distinct queries of bitmap and HLL types as bitmap_union_count and hll_union_agg. + +* `prefer_broadcast_join` + + When choosing the join method(broadcast join or shuffle join), if the broadcast join cost and shuffle join cost are equal, which join method should we prefer. diff --git a/docs/zh-CN/administrator-guide/variables.md b/docs/zh-CN/administrator-guide/variables.md index 3d215754586ff2..e47e4488a00171 100644 --- a/docs/zh-CN/administrator-guide/variables.md +++ b/docs/zh-CN/administrator-guide/variables.md @@ -338,3 +338,6 @@ SET forward_to_master = concat('tr', 'u', 'e'); 是否将 bitmap 和 hll 类型的 count distinct 查询重写为 bitmap_union_count 和 hll_union_agg 。 +* `prefer_broadcast_join` + + 在选择join的具体实现方式是broadcast join还是shuffle join时,如果broadcast join cost和shuffle join cost相等时,优先选择哪种join方式。 diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java index 25be3af500079c..8e177097e7e70b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java @@ -280,6 +280,19 @@ private PlanFragment createScanFragment(PlanNode node) { } } + /** + * When broadcastCost and partitionCost are equal, there is no uniform standard for which join implementation is better. + * Some scenarios are suitable for broadcast join, and some scenarios are suitable for shuffle join. + * Therefore, we add a SessionVariable to help users choose a better join implementation. + */ + private boolean isBroadcastCostSmaller(long broadcastCost, long partitionCost) { + if (ConnectContext.get().getSessionVariable().isPreferBroadcastJoin()) { + return broadcastCost <= partitionCost; + } else { + return broadcastCost < partitionCost; + } + } + /** * Creates either a broadcast join or a repartitioning join, depending on the expected cost. If any of the inputs to * the cost computation is unknown, it assumes the cost will be 0. Costs being equal, it'll favor partitioned over @@ -351,7 +364,7 @@ private PlanFragment createHashJoinFragment(HashJoinNode node, PlanFragment righ && (perNodeMemLimit == 0 || Math.round( (double) rhsDataSize * PlannerContext.HASH_TBL_SPACE_OVERHEAD) <= perNodeMemLimit) && (node.getInnerRef().isBroadcastJoin() || (!node.getInnerRef().isPartitionJoin() - && broadcastCost < partitionCost))) { + && isBroadcastCostSmaller(broadcastCost, partitionCost)))) { doBroadcast = true; } else { doBroadcast = false; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 393cdb02ca6c87..8d45680f667b8c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -74,6 +74,7 @@ public class SessionVariable implements Serializable, Writable { public static final String PARALLEL_FRAGMENT_EXEC_INSTANCE_NUM = "parallel_fragment_exec_instance_num"; public static final String ENABLE_INSERT_STRICT = "enable_insert_strict"; public static final String ENABLE_SPILLING = "enable_spilling"; + public static final String PREFER_BROADCAST_JOIN = "prefer_broadcast_join"; public static final int MIN_EXEC_INSTANCE_NUM = 1; public static final int MAX_EXEC_INSTANCE_NUM = 32; // if set to true, some of stmt will be forwarded to master FE to get result @@ -209,6 +210,9 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = DISABLE_COLOCATE_JOIN) private boolean disableColocateJoin = false; + @VariableMgr.VarAttr(name = PREFER_BROADCAST_JOIN) + private boolean preferBroadcastJoin = true; + /* * the parallel exec instance num for one Fragment in one BE * 1 means disable this feature @@ -397,6 +401,10 @@ public boolean isDisableColocateJoin() { return disableColocateJoin; } + public boolean isPreferBroadcastJoin() {return preferBroadcastJoin; } + + public void setPreferBroadcastJoin(boolean preferBroadcastJoin) {this.preferBroadcastJoin = preferBroadcastJoin; } + public int getParallelExecInstanceNum() { return parallelExecInstanceNum; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java index 95fa585e314aae..1e9fa31c905df5 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java @@ -907,4 +907,23 @@ public void testJoinWithMysqlTable() throws Exception { explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr); Assert.assertFalse(explainString.contains("INNER JOIN (PARTITIONED)")); } + + @Test + public void testPreferBroadcastJoin() throws Exception { + connectContext.setDatabase("default_cluster:test"); + String queryStr = "explain select * from (select k1 from jointest group by k1)t2, jointest t1 where t1.k1 = t2.k1"; + + // default set PreferBroadcastJoin true + String explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr); + Assert.assertTrue(explainString.contains("INNER JOIN (BROADCAST)")); + + connectContext.getSessionVariable().setPreferBroadcastJoin(false); + explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr); + Assert.assertTrue(explainString.contains("INNER JOIN (PARTITIONED)")); + + + connectContext.getSessionVariable().setPreferBroadcastJoin(true); + explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr); + Assert.assertTrue(explainString.contains("INNER JOIN (BROADCAST)")); + } } From bdafba2f22ccb53e899942e58fe7d76a4191ae7d Mon Sep 17 00:00:00 2001 From: wutiangan Date: Thu, 23 Jul 2020 14:17:21 +0800 Subject: [PATCH 2/4] change the sessionVariable's name --- docs/en/administrator-guide/variables.md | 2 +- docs/zh-CN/administrator-guide/variables.md | 2 +- .../org/apache/doris/planner/DistributedPlanner.java | 3 ++- .../main/java/org/apache/doris/qe/SessionVariable.java | 10 +++++----- .../src/main/java/org/apache/doris/qe/VariableMgr.java | 6 ++++++ .../java/org/apache/doris/planner/QueryPlanTest.java | 4 ++-- 6 files changed, 17 insertions(+), 10 deletions(-) diff --git a/docs/en/administrator-guide/variables.md b/docs/en/administrator-guide/variables.md index 5b7cc26fde28fd..82873ad8593047 100644 --- a/docs/en/administrator-guide/variables.md +++ b/docs/en/administrator-guide/variables.md @@ -339,6 +339,6 @@ SET forward_to_master = concat('tr', 'u', 'e'); Whether to rewrite count distinct queries of bitmap and HLL types as bitmap_union_count and hll_union_agg. -* `prefer_broadcast_join` +* `prefer_join_method` When choosing the join method(broadcast join or shuffle join), if the broadcast join cost and shuffle join cost are equal, which join method should we prefer. diff --git a/docs/zh-CN/administrator-guide/variables.md b/docs/zh-CN/administrator-guide/variables.md index e47e4488a00171..af88433c88ad02 100644 --- a/docs/zh-CN/administrator-guide/variables.md +++ b/docs/zh-CN/administrator-guide/variables.md @@ -338,6 +338,6 @@ SET forward_to_master = concat('tr', 'u', 'e'); 是否将 bitmap 和 hll 类型的 count distinct 查询重写为 bitmap_union_count 和 hll_union_agg 。 -* `prefer_broadcast_join` +* `prefer_join_method` 在选择join的具体实现方式是broadcast join还是shuffle join时,如果broadcast join cost和shuffle join cost相等时,优先选择哪种join方式。 diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java index 8e177097e7e70b..b174c22638eb3f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java @@ -286,7 +286,8 @@ private PlanFragment createScanFragment(PlanNode node) { * Therefore, we add a SessionVariable to help users choose a better join implementation. */ private boolean isBroadcastCostSmaller(long broadcastCost, long partitionCost) { - if (ConnectContext.get().getSessionVariable().isPreferBroadcastJoin()) { + String joinMethod = ConnectContext.get().getSessionVariable().getPreferJoinMethod(); + if (joinMethod.equals("broadcast")) { return broadcastCost <= partitionCost; } else { return broadcastCost < partitionCost; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 8d45680f667b8c..c2a0b889bcee0e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -74,7 +74,7 @@ public class SessionVariable implements Serializable, Writable { public static final String PARALLEL_FRAGMENT_EXEC_INSTANCE_NUM = "parallel_fragment_exec_instance_num"; public static final String ENABLE_INSERT_STRICT = "enable_insert_strict"; public static final String ENABLE_SPILLING = "enable_spilling"; - public static final String PREFER_BROADCAST_JOIN = "prefer_broadcast_join"; + public static final String PREFER_JOIN_METHOD = "prefer_join_method"; public static final int MIN_EXEC_INSTANCE_NUM = 1; public static final int MAX_EXEC_INSTANCE_NUM = 32; // if set to true, some of stmt will be forwarded to master FE to get result @@ -210,8 +210,8 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = DISABLE_COLOCATE_JOIN) private boolean disableColocateJoin = false; - @VariableMgr.VarAttr(name = PREFER_BROADCAST_JOIN) - private boolean preferBroadcastJoin = true; + @VariableMgr.VarAttr(name = PREFER_JOIN_METHOD) + private String preferJoinMethod = "broadcast"; /* * the parallel exec instance num for one Fragment in one BE @@ -401,9 +401,9 @@ public boolean isDisableColocateJoin() { return disableColocateJoin; } - public boolean isPreferBroadcastJoin() {return preferBroadcastJoin; } + public String getPreferJoinMethod() {return preferJoinMethod; } - public void setPreferBroadcastJoin(boolean preferBroadcastJoin) {this.preferBroadcastJoin = preferBroadcastJoin; } + public void setPreferJoinMethod(String preferJoinMethod) {this.preferJoinMethod = preferJoinMethod; } public int getParallelExecInstanceNum() { return parallelExecInstanceNum; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java b/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java index 3574ae5cd6616e..b48fbdc66aeb93 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java @@ -244,6 +244,12 @@ public static void setVar(SessionVariable sessionVariable, SetVar setVar) throws } } + if (setVar.getVariable().toLowerCase().equals("prefer_join_method")) { + if (!value.toLowerCase().equals("broadcast") && !value.toLowerCase().equals("shuffle")) { + ErrorReport.reportDdlException(ErrorCode.ERR_WRONG_VALUE_FOR_VAR, "prefer_join_method", value); + } + } + if (setVar.getType() == SetType.GLOBAL) { wlock.lock(); try { diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java index 1e9fa31c905df5..433671ee3b45cb 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java @@ -917,12 +917,12 @@ public void testPreferBroadcastJoin() throws Exception { String explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr); Assert.assertTrue(explainString.contains("INNER JOIN (BROADCAST)")); - connectContext.getSessionVariable().setPreferBroadcastJoin(false); + connectContext.getSessionVariable().setPreferJoinMethod("shuffle"); explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr); Assert.assertTrue(explainString.contains("INNER JOIN (PARTITIONED)")); - connectContext.getSessionVariable().setPreferBroadcastJoin(true); + connectContext.getSessionVariable().setPreferJoinMethod("broadcast"); explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr); Assert.assertTrue(explainString.contains("INNER JOIN (BROADCAST)")); } From 7b0a80eb2f6f64c5ca0a16cb9d6e724a0c629f43 Mon Sep 17 00:00:00 2001 From: wutiangan Date: Thu, 23 Jul 2020 14:39:36 +0800 Subject: [PATCH 3/4] add optional values for doc --- docs/en/administrator-guide/variables.md | 2 ++ docs/zh-CN/administrator-guide/variables.md | 2 ++ 2 files changed, 4 insertions(+) diff --git a/docs/en/administrator-guide/variables.md b/docs/en/administrator-guide/variables.md index 82873ad8593047..a41396a578827d 100644 --- a/docs/en/administrator-guide/variables.md +++ b/docs/en/administrator-guide/variables.md @@ -342,3 +342,5 @@ SET forward_to_master = concat('tr', 'u', 'e'); * `prefer_join_method` When choosing the join method(broadcast join or shuffle join), if the broadcast join cost and shuffle join cost are equal, which join method should we prefer. + + Currently, the optional values for this variable are "broadcast" or "shuffle". diff --git a/docs/zh-CN/administrator-guide/variables.md b/docs/zh-CN/administrator-guide/variables.md index af88433c88ad02..f9e6654410ae26 100644 --- a/docs/zh-CN/administrator-guide/variables.md +++ b/docs/zh-CN/administrator-guide/variables.md @@ -341,3 +341,5 @@ SET forward_to_master = concat('tr', 'u', 'e'); * `prefer_join_method` 在选择join的具体实现方式是broadcast join还是shuffle join时,如果broadcast join cost和shuffle join cost相等时,优先选择哪种join方式。 + + 目前该变量的可选值为"broadcast" 或者 "shuffle"。 From 8b43c5dfb143975751d08a53ae5adf38270f5ab4 Mon Sep 17 00:00:00 2001 From: wutiangan Date: Tue, 28 Jul 2020 17:08:55 +0800 Subject: [PATCH 4/4] change equals to equalsIgnoreCase --- .../main/java/org/apache/doris/planner/DistributedPlanner.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java index b174c22638eb3f..88775e161f3f39 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java @@ -287,7 +287,7 @@ private PlanFragment createScanFragment(PlanNode node) { */ private boolean isBroadcastCostSmaller(long broadcastCost, long partitionCost) { String joinMethod = ConnectContext.get().getSessionVariable().getPreferJoinMethod(); - if (joinMethod.equals("broadcast")) { + if (joinMethod.equalsIgnoreCase("broadcast")) { return broadcastCost <= partitionCost; } else { return broadcastCost < partitionCost;