From c88e1a2f0af4b73b9390fcd94d3df8f03fb1ce49 Mon Sep 17 00:00:00 2001 From: EmmyMiao87 <522274284@qq.com> Date: Thu, 11 Mar 2021 20:14:59 +0800 Subject: [PATCH 1/6] [Colocate plan][Step1] Colocate join covers more situations The old colocate join can only cover the case where the child is hash or scan. In fact, as long as the child's data distribution meets the requirements, no matter what the plan node on the child node is, a colocate join can be performed. --- .../java/org/apache/doris/analysis/Expr.java | 23 +- .../analysis/ModifyTablePropertiesClause.java | 4 - .../org/apache/doris/catalog/Catalog.java | 28 +- ...a => ColocateTableCheckerAndBalancer.java} | 17 +- .../java/org/apache/doris/common/Config.java | 20 +- .../planner/DistributedPlanColocateRule.java | 29 ++ .../doris/planner/DistributedPlanner.java | 347 ++++++++++-------- .../apache/doris/planner/HashJoinNode.java | 6 +- .../org/apache/doris/planner/PlanNode.java | 19 + .../java/org/apache/doris/qe/Coordinator.java | 6 +- .../org/apache/doris/qe/SessionVariable.java | 10 +- .../doris/system/SystemInfoService.java | 20 +- ... ColocateTableCheckerAndBalancerTest.java} | 4 +- .../doris/planner/ColocatePlanTest.java | 113 ++++++ .../apache/doris/utframe/UtFrameUtils.java | 21 +- 15 files changed, 442 insertions(+), 225 deletions(-) rename fe/fe-core/src/main/java/org/apache/doris/clone/{ColocateTableBalancer.java => ColocateTableCheckerAndBalancer.java} (98%) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanColocateRule.java rename fe/fe-core/src/test/java/org/apache/doris/clone/{ColocateTableBalancerTest.java => ColocateTableCheckerAndBalancerTest.java} (99%) create mode 100644 fe/fe-core/src/test/java/org/apache/doris/planner/ColocatePlanTest.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java index 69c996c3c2f4a1..920a54fdc65be0 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java @@ -45,12 +45,12 @@ import java.io.IOException; import java.lang.reflect.Method; import java.util.ArrayList; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.ListIterator; import java.util.Map; import java.util.Set; -import java.util.HashMap; /** * Root of the expr node hierarchy. @@ -1334,7 +1334,26 @@ public Type castBinaryOp(Type compatibleType) throws AnalysisException { @Override public String toString() { return MoreObjects.toStringHelper(this.getClass()).add("id", id).add("type", type).add("sel", - selectivity).add("#distinct", numDistinctValues).add("scale", outputScale).toString(); + selectivity).add("#distinct", numDistinctValues).add("scale", outputScale).toString(); + } + + public SlotRef getSrcSlotRef() { + SlotRef unwrapSloRef = this.unwrapSlotRef(); + if (unwrapSloRef == null) { + return null; + } + SlotDescriptor slotDescriptor = unwrapSloRef.getDesc(); + if (slotDescriptor == null) { + return null; + } + List sourceExpr = slotDescriptor.getSourceExprs(); + if (sourceExpr == null || sourceExpr.isEmpty()) { + return (SlotRef) this; + } + if (sourceExpr.size() > 1) { + return null; + } + return sourceExpr.get(0).getSrcSlotRef(); } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java index 386b4b3eb29be9..4ad057ef5eb6fe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java @@ -20,7 +20,6 @@ import org.apache.doris.alter.AlterOpType; import org.apache.doris.catalog.TableProperty; import org.apache.doris.common.AnalysisException; -import org.apache.doris.common.Config; import org.apache.doris.common.util.DynamicPartitionUtil; import org.apache.doris.common.util.PrintableMap; import org.apache.doris.common.util.PropertyAnalyzer; @@ -49,9 +48,6 @@ public void analyze(Analyzer analyzer) throws AnalysisException { } if (properties.containsKey(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH)) { - if (Config.disable_colocate_join) { - throw new AnalysisException("Colocate table is disabled by Admin"); - } this.needTableStable = false; } else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_STORAGE_TYPE)) { if (!properties.get(PropertyAnalyzer.PROPERTIES_STORAGE_TYPE).equalsIgnoreCase("column")) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java index cd54710f846062..7d76648c1c0d27 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java @@ -93,7 +93,7 @@ import org.apache.doris.catalog.Replica.ReplicaState; import org.apache.doris.catalog.Replica.ReplicaStatus; import org.apache.doris.catalog.Table.TableType; -import org.apache.doris.clone.ColocateTableBalancer; +import org.apache.doris.clone.ColocateTableCheckerAndBalancer; import org.apache.doris.clone.DynamicPartitionScheduler; import org.apache.doris.clone.TabletChecker; import org.apache.doris.clone.TabletScheduler; @@ -217,11 +217,6 @@ import org.apache.doris.transaction.PublishVersionDaemon; import org.apache.doris.transaction.UpdateDbUsedDataQuotaDaemon; -import org.apache.commons.collections.CollectionUtils; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.codehaus.jackson.map.ObjectMapper; - import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.base.Strings; @@ -232,9 +227,10 @@ import com.google.common.collect.Queues; import com.google.common.collect.Range; import com.google.common.collect.Sets; -import com.sleepycat.je.rep.InsufficientLogException; -import com.sleepycat.je.rep.NetworkRestore; -import com.sleepycat.je.rep.NetworkRestoreConfig; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.io.BufferedInputStream; import java.io.BufferedReader; @@ -264,6 +260,11 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import com.sleepycat.je.rep.InsufficientLogException; +import com.sleepycat.je.rep.NetworkRestore; +import com.sleepycat.je.rep.NetworkRestoreConfig; +import org.codehaus.jackson.map.ObjectMapper; + public class Catalog { private static final Logger LOG = LogManager.getLogger(Catalog.class); // 0 ~ 9999 used for qe @@ -1263,10 +1264,8 @@ private void startMasterOnlyDaemonThreads() { // Tablet checker and scheduler tabletChecker.start(); tabletScheduler.start(); - // Colocate tables balancer - if (!Config.disable_colocate_join) { - ColocateTableBalancer.getInstance().start(); - } + // Colocate tables checker and balancer + ColocateTableCheckerAndBalancer.getInstance().start(); // Publish Version Daemon publishVersionDaemon.start(); // Start txn cleaner @@ -3645,9 +3644,6 @@ private void createOlapTable(Database db, CreateTableStmt stmt) throws DdlExcept try { String colocateGroup = PropertyAnalyzer.analyzeColocate(properties); if (colocateGroup != null) { - if (Config.disable_colocate_join) { - ErrorReport.reportDdlException(ErrorCode.ERR_COLOCATE_FEATURE_DISABLED); - } String fullGroupName = db.getId() + "_" + colocateGroup; ColocateGroupSchema groupSchema = colocateTableIndex.getGroupSchema(fullGroupName); if (groupSchema != null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableBalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java similarity index 98% rename from fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableBalancer.java rename to fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java index d317772b9bcc45..d566eb0d7e1aad 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableBalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java @@ -53,24 +53,23 @@ /** * ColocateTableBalancer is responsible for tablets' repair and balance of colocated tables. */ -public class ColocateTableBalancer extends MasterDaemon { - private static final Logger LOG = LogManager.getLogger(ColocateTableBalancer.class); +public class ColocateTableCheckerAndBalancer extends MasterDaemon { + private static final Logger LOG = LogManager.getLogger(ColocateTableCheckerAndBalancer.class); private static final long CHECK_INTERVAL_MS = 20 * 1000L; // 20 second - private ColocateTableBalancer(long intervalMs) { + private ColocateTableCheckerAndBalancer(long intervalMs) { super("colocate group clone checker", intervalMs); } - private static volatile ColocateTableBalancer INSTANCE = null; - public static ColocateTableBalancer getInstance() { + private static volatile ColocateTableCheckerAndBalancer INSTANCE = null; + public static ColocateTableCheckerAndBalancer getInstance() { if (INSTANCE == null) { - synchronized (ColocateTableBalancer.class) { + synchronized (ColocateTableCheckerAndBalancer.class) { if (INSTANCE == null) { - INSTANCE = new ColocateTableBalancer(CHECK_INTERVAL_MS); + INSTANCE = new ColocateTableCheckerAndBalancer(CHECK_INTERVAL_MS); } - } - } + } } return INSTANCE; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java index 27206c664a39cc..150149a11740af 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java @@ -781,10 +781,16 @@ public class Config extends ConfigBase { public static int query_colocate_join_memory_limit_penalty_factor = 1; /** - * Deprecated after 0.10 + * This configs can set to true to disable the automatic colocate tables's relocate and balance. + * If 'disable_colocate_balance' is set to true, + * ColocateTableBalancer will not relocate and balance colocate tables. + * Attention: + * Under normal circumstances, there is no need to turn off balance at all. + * Because once the balance is turned off, the unstable colocate table may not be restored + * Eventually the colocate plan cannot be used when querying. */ - @ConfField - public static boolean disable_colocate_join = false; + @ConfField(mutable = true, masterOnly = true) public static boolean disable_colocate_balance = false; + /** * The default user resource publishing timeout. */ @@ -1093,14 +1099,6 @@ public class Config extends ConfigBase { * Save small files */ @ConfField public static String small_file_dir = PaloFe.DORIS_HOME_DIR + "/small_files"; - - /** - * The following 2 configs can set to true to disable the automatic colocate tables's relocate and balance. - * if 'disable_colocate_relocate' is set to true, ColocateTableBalancer will not relocate colocate tables when Backend unavailable. - * if 'disable_colocate_balance' is set to true, ColocateTableBalancer will not balance colocate tables. - */ - @ConfField(mutable = true, masterOnly = true) public static boolean disable_colocate_relocate = false; - @ConfField(mutable = true, masterOnly = true) public static boolean disable_colocate_balance = false; /** * If set to true, the insert stmt with processing error will still return a label to user. diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanColocateRule.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanColocateRule.java new file mode 100644 index 00000000000000..dc864499b80b8d --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanColocateRule.java @@ -0,0 +1,29 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner; + +public class DistributedPlanColocateRule { + public static final String SESSION_DISABLED = "Session disabled"; + public static final String HAS_JOIN_HINT = "Has join hint"; + public static final String TRANSFORMED_SRC_COLUMN = "Src column hash been transformed by expr"; + public static final String REDISTRIBUTED_SRC_DATA = "The src data has been redistributed"; + public static final String SUPPORT_ONLY_OLAP_TABLE = "Only olap table support colocate plan"; + public static final String TABLE_NOT_IN_THE_SAME_GROUP = "Tables are not in the same group"; + public static final String COLOCATE_GROUP_IS_NOT_STABLE = "Colocate group is not stable"; + public static final String INCONSISTENT_DISTRIBUTION_OF_TABLE_AND_QUERY = "Inconsistent distribution of table and querie"; +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java index 8bbe075f6dec25..592974c8511e0a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java @@ -24,6 +24,7 @@ import org.apache.doris.analysis.JoinOperator; import org.apache.doris.analysis.QueryStmt; import org.apache.doris.analysis.SlotDescriptor; +import org.apache.doris.analysis.SlotRef; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.ColocateTableIndex; import org.apache.doris.catalog.ColocateTableIndex.GroupId; @@ -33,7 +34,7 @@ import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Table; import org.apache.doris.common.AnalysisException; -import org.apache.doris.common.Config; +import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; import org.apache.doris.qe.ConnectContext; import org.apache.doris.thrift.TPartitionType; @@ -47,6 +48,9 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Map; + +import avro.shaded.com.google.common.collect.Maps; /** * The distributed planner is responsible for creating an executable, distributed plan @@ -294,16 +298,52 @@ private boolean isBroadcastCostSmaller(long broadcastCost, long partitionCost) } /** - * Creates either a broadcast join or a repartitioning join, depending on the expected cost. If any of the inputs to - * the cost computation is unknown, it assumes the cost will be 0. Costs being equal, it'll favor partitioned over - * broadcast joins. If perNodeMemLimit > 0 and the size of the hash table for a broadcast join is expected to exceed - * that mem limit, switches to partitioned join instead. TODO: revisit the choice of broadcast as the default TODO: - * don't create a broadcast join if we already anticipate that this will exceed the query's memory budget. + * There are 4 kinds of distributed hash join methods in Doris: + * Colocate, Bucket Shuffle, Broadcast, Shuffle + * The priority between these four distributed execution methods is following: + * Colocate > Bucket Shuffle > Broadcast > Shuffle + * This function is mainly used to choose the most suitable distributed method for the 'node', + * and transform it into PlanFragment. */ private PlanFragment createHashJoinFragment(HashJoinNode node, PlanFragment rightChildFragment, PlanFragment leftChildFragment, long perNodeMemLimit, ArrayList fragments) throws UserException { + List reason = Lists.newArrayList(); + if (canColocateJoin(node, leftChildFragment, rightChildFragment, reason)) { + node.setColocate(true, ""); + //node.setDistributionMode(HashJoinNode.DistributionMode.PARTITIONED); + node.setChild(0, leftChildFragment.getPlanRoot()); + node.setChild(1, rightChildFragment.getPlanRoot()); + leftChildFragment.setPlanRoot(node); + fragments.remove(rightChildFragment); + return leftChildFragment; + } else { + node.setColocate(false, reason.get(0)); + } + + // bucket shuffle join is better than broadcast and shuffle join + // it can reduce the network cost of join, so doris chose it first + List rhsPartitionxprs = Lists.newArrayList(); + if (canBucketShuffleJoin(node, leftChildFragment, rightChildFragment, rhsPartitionxprs)) { + node.setDistributionMode(HashJoinNode.DistributionMode.BUCKET_SHUFFLE); + DataPartition rhsJoinPartition = + new DataPartition(TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED, rhsPartitionxprs); + ExchangeNode rhsExchange = + new ExchangeNode(ctx_.getNextNodeId(), rightChildFragment.getPlanRoot(), false); + rhsExchange.setNumInstances(rightChildFragment.getPlanRoot().getNumInstances()); + rhsExchange.init(ctx_.getRootAnalyzer()); + + node.setChild(0, leftChildFragment.getPlanRoot()); + node.setChild(1, rhsExchange); + leftChildFragment.setPlanRoot(node); + + rightChildFragment.setDestination(rhsExchange); + rightChildFragment.setOutputPartition(rhsJoinPartition); + + return leftChildFragment; + } + // broadcast: send the rightChildFragment's output to each node executing // the leftChildFragment; the cost across all nodes is proportional to the // total amount of data sent @@ -366,7 +406,7 @@ private PlanFragment createHashJoinFragment(HashJoinNode node, PlanFragment righ } else if (!node.getInnerRef().isPartitionJoin() && isBroadcastCostSmaller(broadcastCost, partitionCost) && (perNodeMemLimit == 0 - || Math.round((double) rhsDataSize * PlannerContext.HASH_TBL_SPACE_OVERHEAD) <= perNodeMemLimit)) { + || Math.round((double) rhsDataSize * PlannerContext.HASH_TBL_SPACE_OVERHEAD) <= perNodeMemLimit)) { doBroadcast = true; } else { doBroadcast = false; @@ -375,49 +415,6 @@ && isBroadcastCostSmaller(broadcastCost, partitionCost) doBroadcast = false; } - // Push down the predicates constructed by the right child when the - // join op is inner join or left semi join. - // Colocate join, Bucket Shuffle join, Broadcast join support local rumtime filter - // For Shuffle join, set is push down false after this code in line:475 - if (node.getJoinOp().isInnerJoin() || node.getJoinOp().isLeftSemiJoin()) { - node.setIsPushDown(true); - } - - List reason = Lists.newArrayList(); - if (canColocateJoin(node, leftChildFragment, rightChildFragment, reason)) { - node.setColocate(true, ""); - //node.setDistributionMode(HashJoinNode.DistributionMode.PARTITIONED); - node.setChild(0, leftChildFragment.getPlanRoot()); - node.setChild(1, rightChildFragment.getPlanRoot()); - leftChildFragment.setPlanRoot(node); - fragments.remove(rightChildFragment); - return leftChildFragment; - } else { - node.setColocate(false, reason.get(0)); - } - - // bucket shuffle join is better than broadcast and shuffle join - // it can reduce the network cost of join, so doris chose it first - List rhsPartitionxprs = Lists.newArrayList(); - if (canBucketShuffleJoin(node, leftChildFragment, rightChildFragment, rhsPartitionxprs)) { - node.setDistributionMode(HashJoinNode.DistributionMode.BUCKET_SHUFFLE); - DataPartition rhsJoinPartition = - new DataPartition(TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED, rhsPartitionxprs); - ExchangeNode rhsExchange = - new ExchangeNode(ctx_.getNextNodeId(), rightChildFragment.getPlanRoot(), false); - rhsExchange.setNumInstances(rightChildFragment.getPlanRoot().getNumInstances()); - rhsExchange.init(ctx_.getRootAnalyzer()); - - node.setChild(0, leftChildFragment.getPlanRoot()); - node.setChild(1, rhsExchange); - leftChildFragment.setPlanRoot(node); - - rightChildFragment.setDestination(rhsExchange); - rightChildFragment.setOutputPartition(rhsJoinPartition); - - return leftChildFragment; - } - if (doBroadcast) { node.setDistributionMode(HashJoinNode.DistributionMode.BROADCAST); // Doesn't create a new fragment, but modifies leftChildFragment to execute @@ -471,58 +468,169 @@ && isBroadcastCostSmaller(broadcastCost, partitionCost) rightChildFragment.setDestination(rhsExchange); rightChildFragment.setOutputPartition(rhsJoinPartition); - // Before we support global runtime filter, only shuffle join do not enable local runtime filter + // TODO: Before we support global runtime filter, only shuffle join do not enable local runtime filter node.setIsPushDown(false); return joinFragment; } } + /** + * Colocate Join can be performed when the following 4 conditions are met at the same time. + * 1. Session variables disable_colocate_plan = true + * 2. There is no join hints in HashJoinNode + * 3. There are no exchange node between source scan node and HashJoinNode. + * 4. The scan nodes which are related by EqConjuncts in HashJoinNode are colocate and group can be matched. + */ private boolean canColocateJoin(HashJoinNode node, PlanFragment leftChildFragment, PlanFragment rightChildFragment, - List cannotReason) { - if (Config.disable_colocate_join) { - cannotReason.add("Disabled"); + List cannotReason) { + // Condition1 + if (ConnectContext.get().getSessionVariable().isDisableColocatePlan()) { + cannotReason.add(DistributedPlanColocateRule.SESSION_DISABLED); return false; } - if (ConnectContext.get().getSessionVariable().isDisableColocateJoin()) { - cannotReason.add("Session disabled"); + // Condition2: If user have a join hint to use proper way of join, can not be colocate join + if (node.getInnerRef().hasJoinHints()) { + cannotReason.add(DistributedPlanColocateRule.HAS_JOIN_HINT); return false; } - // If user have a join hint to use proper way of join, can not be colocate join - if (node.getInnerRef().hasJoinHints()) { - cannotReason.add("Has join hint"); - return false; + // Condition3: + // If there is an exchange node between the HashJoinNode and their real associated ScanNode, + // it means that the data has been rehashed. + // The rehashed data can no longer be guaranteed to correspond to the left and right buckets, + // and naturally cannot be colocate + Map, List> scanNodeWithJoinConjuncts = Maps.newHashMap(); + for (BinaryPredicate eqJoinPredicate : node.getEqJoinConjuncts()) { + OlapScanNode leftScanNode = genSrcScanNode(eqJoinPredicate.getChild(0), leftChildFragment, cannotReason); + if (leftScanNode == null) { + return false; + } + OlapScanNode rightScanNode = genSrcScanNode(eqJoinPredicate.getChild(1), rightChildFragment, cannotReason); + if (rightScanNode == null) { + return false; + } + Pair eqPair = new Pair<>(leftScanNode, rightScanNode); + List predicateList = scanNodeWithJoinConjuncts.get(eqPair); + if (predicateList == null) { + predicateList = Lists.newArrayList(); + scanNodeWithJoinConjuncts.put(eqPair, predicateList); + } + predicateList.add(eqJoinPredicate); } - PlanNode leftRoot = leftChildFragment.getPlanRoot(); - PlanNode rightRoot = rightChildFragment.getPlanRoot(); + // Condition4 + return dataDistributionMatchEqPredicate(scanNodeWithJoinConjuncts, cannotReason); + } - //leftRoot should be ScanNode or HashJoinNode, rightRoot should be ScanNode - if (leftRoot instanceof OlapScanNode && rightRoot instanceof OlapScanNode) { - return canColocateJoin(node, leftRoot, rightRoot, cannotReason); + private OlapScanNode genSrcScanNode(Expr expr, PlanFragment planFragment, List cannotReason) { + SlotRef slotRef = expr.getSrcSlotRef(); + if (slotRef == null) { + cannotReason.add(DistributedPlanColocateRule.TRANSFORMED_SRC_COLUMN); + return null; } + ScanNode scanNode = planFragment.getPlanRoot() + .getScanNodeInOneFragmentByTupleId(slotRef.getDesc().getParent().getId()); + if (scanNode == null) { + cannotReason.add(DistributedPlanColocateRule.REDISTRIBUTED_SRC_DATA); + return null; + } + if (scanNode instanceof OlapScanNode) { + return ((OlapScanNode) scanNode); + } else { + cannotReason.add(DistributedPlanColocateRule.SUPPORT_ONLY_OLAP_TABLE); + return null; + } + } - if (leftRoot instanceof HashJoinNode && rightRoot instanceof OlapScanNode) { - while (leftRoot instanceof HashJoinNode) { - if (!((HashJoinNode)leftRoot).isShuffleJoin()) { - leftRoot = leftRoot.getChild(0); - } else { - cannotReason.add("left hash join node can not do colocate"); - return false; + private boolean dataDistributionMatchEqPredicate(Map, List> scanNodeWithJoinConjuncts, + List cannotReason) { + // If left table and right table is same table and they select same single partition or no partition + // they are naturally colocate relationship no need to check colocate group + for (Map.Entry, List> entry : scanNodeWithJoinConjuncts.entrySet()) { + OlapScanNode leftScanNode = entry.getKey().first; + OlapScanNode rightScanNode = entry.getKey().second; + List eqPredicates = entry.getValue(); + if (!dataDistributionMatchEqPredicate(eqPredicates, leftScanNode, rightScanNode, cannotReason)) { + return false; + } + } + return true; + } + + + //the table must be colocate + //the colocate group must be stable + //the eqJoinConjuncts must contain the distributionColumns + private boolean dataDistributionMatchEqPredicate(List eqJoinPredicates, OlapScanNode leftRoot, + OlapScanNode rightRoot, List cannotReason) { + OlapTable leftTable = leftRoot.getOlapTable(); + OlapTable rightTable = rightRoot.getOlapTable(); + + // if left table and right table is same table and they select same single partition or no partition + // they are naturally colocate relationship no need to check colocate group + Collection leftPartitions = leftRoot.getSelectedPartitionIds(); + Collection rightPartitions = rightRoot.getSelectedPartitionIds(); + boolean noNeedCheckColocateGroup = (leftTable.getId() == rightTable.getId()) && (leftPartitions.equals(rightPartitions)) && + (leftPartitions.size() <= 1); + + if (!noNeedCheckColocateGroup) { + ColocateTableIndex colocateIndex = Catalog.getCurrentColocateIndex(); + + //1 the table must be colocate + if (!colocateIndex.isSameGroup(leftTable.getId(), rightTable.getId())) { + cannotReason.add(DistributedPlanColocateRule.TABLE_NOT_IN_THE_SAME_GROUP); + return false; + } + + //2 the colocate group must be stable + GroupId groupId = colocateIndex.getGroup(leftTable.getId()); + if (colocateIndex.isGroupUnstable(groupId)) { + cannotReason.add(DistributedPlanColocateRule.COLOCATE_GROUP_IS_NOT_STABLE); + return false; + } + } + + DistributionInfo leftDistribution = leftTable.getDefaultDistributionInfo(); + DistributionInfo rightDistribution = rightTable.getDefaultDistributionInfo(); + + if (leftDistribution instanceof HashDistributionInfo && rightDistribution instanceof HashDistributionInfo) { + List leftDistributeColumns = ((HashDistributionInfo) leftDistribution).getDistributionColumns(); + List rightDistributeColumns = ((HashDistributionInfo) rightDistribution).getDistributionColumns(); + + List leftJoinColumns = new ArrayList<>(); + List rightJoinColumns = new ArrayList<>(); + for (BinaryPredicate eqJoinPredicate : eqJoinPredicates) { + SlotRef lhsSlotRef = eqJoinPredicate.getChild(0).getSrcSlotRef(); + SlotRef rhsSlotRef = eqJoinPredicate.getChild(1).getSrcSlotRef(); + Preconditions.checkState(lhsSlotRef != null); + Preconditions.checkState(rhsSlotRef != null); + + Column leftColumn = lhsSlotRef.getDesc().getColumn(); + Column rightColumn = rhsSlotRef.getDesc().getColumn(); + int leftColumnIndex = leftDistributeColumns.indexOf(leftColumn); + int rightColumnIndex = rightDistributeColumns.indexOf(rightColumn); + + // eqjoinConjuncts column should have the same order like colocate distribute column + if (leftColumnIndex == rightColumnIndex && leftColumnIndex != -1) { + leftJoinColumns.add(leftColumn); + rightJoinColumns.add(rightColumn); } } - if (leftRoot instanceof OlapScanNode) { - return canColocateJoin(node, leftRoot, rightRoot, cannotReason); + + //3 the join columns should contains all distribute columns to enable colocate join + if (leftJoinColumns.containsAll(leftDistributeColumns) + && rightJoinColumns.containsAll(rightDistributeColumns)) { + return true; } } - cannotReason.add("Node type not match"); + cannotReason.add(DistributedPlanColocateRule.INCONSISTENT_DISTRIBUTION_OF_TABLE_AND_QUERY); return false; } private boolean canBucketShuffleJoin(HashJoinNode node, PlanFragment leftChildFragment, PlanFragment rightChildFragment, - List rhsHashExprs) { + List rhsHashExprs) { if (!ConnectContext.get().getSessionVariable().isEnableBucketShuffleJoin()) { return false; } @@ -538,7 +646,6 @@ private boolean canBucketShuffleJoin(HashJoinNode node, PlanFragment leftChildFr } // 2.leftRoot be hashjoin node and not shuffle join - PlanNode rightRoot = rightChildFragment.getPlanRoot(); if (leftRoot instanceof HashJoinNode) { while (leftRoot instanceof HashJoinNode) { if (!((HashJoinNode)leftRoot).isShuffleJoin()) { @@ -557,7 +664,7 @@ private boolean canBucketShuffleJoin(HashJoinNode node, PlanFragment leftChildFr //the join expr must contian left table distribute column private boolean canBucketShuffleJoin(HashJoinNode node, PlanNode leftRoot, - List rhsJoinExprs) { + List rhsJoinExprs) { OlapScanNode leftScanNode = ((OlapScanNode) leftRoot); OlapTable leftTable = leftScanNode.getOlapTable(); @@ -566,7 +673,7 @@ private boolean canBucketShuffleJoin(HashJoinNode node, PlanNode leftRoot, ColocateTableIndex colocateIndex = Catalog.getCurrentColocateIndex(); if (!leftTable.isColocateTable() || colocateIndex.isGroupUnstable(colocateIndex.getGroup(leftTable.getId()))) - return false; + return false; } DistributionInfo leftDistribution = leftScanNode.getOlapTable().getDefaultDistributionInfo(); @@ -608,80 +715,6 @@ private boolean canBucketShuffleJoin(HashJoinNode node, PlanNode leftRoot, return true; } - //the table must be colocate - //the colocate group must be stable - //the eqJoinConjuncts must contain the distributionColumns - private boolean canColocateJoin(HashJoinNode node, PlanNode leftRoot, PlanNode rightRoot, - List cannotReason) { - OlapTable leftTable = ((OlapScanNode) leftRoot).getOlapTable(); - OlapTable rightTable = ((OlapScanNode) rightRoot).getOlapTable(); - - // if left table and right table is same table and they select same single partition or no partition - // they are naturally colocate relationship no need to check colocate group - Collection leftPartitions = ((OlapScanNode)leftRoot).getSelectedPartitionIds(); - Collection rightPartitions = ((OlapScanNode)rightRoot).getSelectedPartitionIds(); - boolean noNeedCheckColocateGroup = (leftTable.getId() == rightTable.getId()) && (leftPartitions.equals(rightPartitions)) && - (leftPartitions.size() <= 1); - - if (!noNeedCheckColocateGroup) { - ColocateTableIndex colocateIndex = Catalog.getCurrentColocateIndex(); - - //1 the table must be colocate - if (!colocateIndex.isSameGroup(leftTable.getId(), rightTable.getId())) { - cannotReason.add("table not in the same group"); - return false; - } - - //2 the colocate group must be stable - GroupId groupId = colocateIndex.getGroup(leftTable.getId()); - if (colocateIndex.isGroupUnstable(groupId)) { - cannotReason.add("group is not stable"); - return false; - } - } - - DistributionInfo leftDistribution = leftTable.getDefaultDistributionInfo(); - DistributionInfo rightDistribution = rightTable.getDefaultDistributionInfo(); - - if (leftDistribution instanceof HashDistributionInfo && rightDistribution instanceof HashDistributionInfo) { - List leftDistributeColumns = ((HashDistributionInfo) leftDistribution).getDistributionColumns(); - List rightDistributeColumns = ((HashDistributionInfo) rightDistribution).getDistributionColumns(); - - List leftJoinColumns = new ArrayList<>(); - List rightJoinColumns = new ArrayList<>(); - List eqJoinConjuncts = node.getEqJoinConjuncts(); - for (BinaryPredicate eqJoinPredicate : eqJoinConjuncts) { - Expr lhsJoinExpr = eqJoinPredicate.getChild(0); - Expr rhsJoinExpr = eqJoinPredicate.getChild(1); - if (lhsJoinExpr.unwrapSlotRef() == null || rhsJoinExpr.unwrapSlotRef() == null) { - continue; - } - - SlotDescriptor leftSlot = lhsJoinExpr.unwrapSlotRef().getDesc(); - SlotDescriptor rightSlot = rhsJoinExpr.unwrapSlotRef().getDesc(); - - Column leftColumn = leftSlot.getColumn(); - Column rightColumn = rightSlot.getColumn(); - int leftColumnIndex = leftDistributeColumns.indexOf(leftColumn); - int rightColumnIndex = rightDistributeColumns.indexOf(rightColumn); - - // eqjoinConjuncts column should have the same order like colocate distribute column - if (leftColumnIndex == rightColumnIndex && leftColumnIndex != -1) { - leftJoinColumns.add(leftSlot.getColumn()); - rightJoinColumns.add(rightSlot.getColumn()); - } - } - - //3 the join columns should contains all distribute columns to enable colocate join - if (leftJoinColumns.containsAll(leftDistributeColumns) && rightJoinColumns.containsAll(rightDistributeColumns)) { - return true; - } - } - - cannotReason.add("column not match"); - return false; - } - /** * Modifies the leftChildFragment to execute a cross join. The right child input is provided by an ExchangeNode, * which is the destination of the rightChildFragment's output. @@ -812,6 +845,7 @@ private PlanFragment createSetOperationNodeFragment( } // There is at least one partitioned child fragment. + // TODO(ML): here PlanFragment setOperationFragment = new PlanFragment(ctx_.getNextFragmentId(), setOperationNode, DataPartition.RANDOM); for (int i = 0; i < childFragments.size(); ++i) { @@ -932,9 +966,9 @@ private PlanFragment createAggregationFragment( if (isDistinct) { return createPhase2DistinctAggregationFragment(node, childFragment, fragments); } else { - // Check table's distribution. See #4481. PlanNode childPlan = childFragment.getPlanRoot(); + // TODO(ML): here if (childPlan instanceof OlapScanNode && ((OlapScanNode) childPlan).getOlapTable().meetAggDistributionRequirements(node.getAggInfo())) { childFragment.addPlanRoot(node); @@ -1087,7 +1121,7 @@ private PlanFragment createPhase2DistinctAggregationFragment( partitionExprs == null ? DataPartition.UNPARTITIONED : DataPartition.hashPartitioned(partitionExprs); // Convert the existing node to a preaggregation. AggregationNode preaggNode = (AggregationNode)node.getChild(0); - + preaggNode.setIsPreagg(ctx_); // place a merge aggregation step for the 1st phase in a new fragment @@ -1175,8 +1209,9 @@ private PlanFragment createAnalyticFragment( // required if the sort partition exprs reference a tuple that is made nullable in // 'childFragment' to bring NULLs from outer-join non-matches together. DataPartition sortPartition = sortNode.getInputPartition(); + // TODO(ML): here if (!childFragment.getDataPartition().equals(sortPartition)) { - // TODO(zc) || childFragment.refsNullableTupleId(sortPartition.getPartitionExprs())) { + // TODO(zc) || childFragment.refsNullableTupleId(sortPartition.getPartitionExprs())) { analyticFragment = createParentFragment(childFragment, sortNode.getInputPartition()); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java index 93ed8cf804a6c1..2360d5c7dbba7f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java @@ -60,7 +60,7 @@ public class HashJoinNode extends PlanNode { private List eqJoinConjuncts = Lists.newArrayList(); // join conjuncts from the JOIN clause that aren't equi-join predicates private List otherJoinConjuncts; - private boolean isPushDown; + private boolean isPushDown = false; private DistributionMode distrMode; private boolean isColocate = false; //the flag for colocate join private String colocateReason = ""; // if can not do colocate join, set reason here @@ -85,7 +85,9 @@ public HashJoinNode(PlanNodeId id, PlanNode outer, PlanNode inner, TableRef inne this.otherJoinConjuncts = otherJoinConjuncts; children.add(outer); children.add(inner); - this.isPushDown = false; + if (this.joinOp.isInnerJoin() || this.joinOp.isLeftSemiJoin()) { + this.isPushDown = true; + } // Inherits all the nullable tuple from the children // Mark tuples that form the "nullable" side of the outer join as nullable. diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java index 1f4b4dc69c2512..f263c458655b41 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java @@ -645,4 +645,23 @@ public String toString() { sb.append("\n").append(getNodeExplainString("", TExplainLevel.BRIEF)); return sb.toString(); } + + public ScanNode getScanNodeInOneFragmentByTupleId(TupleId tupleId) { + if (this instanceof ScanNode) { + if (tupleIds.contains(tupleId)) { + return (ScanNode) this; + } + return null; + } else if (this instanceof ExchangeNode) { + return null; + } else { + for (PlanNode planNode : children) { + ScanNode scanNode = planNode.getScanNodeInOneFragmentByTupleId(tupleId); + if (scanNode != null) { + return scanNode; + } + } + return null; + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 1081ebc268b406..7b8e88023f8473 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -1096,16 +1096,12 @@ private void computeFragmentHosts() throws Exception { // One fragment could only have one HashJoinNode private boolean isColocateJoin(PlanNode node) { - if (Config.disable_colocate_join) { - return false; - } - // TODO(cmy): some internal process, such as broker load task, do not have ConnectContext. // Any configurations needed by the Coordinator should be passed in Coordinator initialization. // Refine this later. // Currently, just ignore the session variables if ConnectContext does not exist if (ConnectContext.get() != null) { - if (ConnectContext.get().getSessionVariable().isDisableColocateJoin()) { + if (ConnectContext.get().getSessionVariable().isDisableColocatePlan()) { return false; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 40bc2bf9cfbf0d..9ede53e1da5cd3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -72,7 +72,7 @@ public class SessionVariable implements Serializable, Writable { public static final int MIN_EXEC_MEM_LIMIT = 2097152; public static final String BATCH_SIZE = "batch_size"; public static final String DISABLE_STREAMING_PREAGGREGATIONS = "disable_streaming_preaggregations"; - public static final String DISABLE_COLOCATE_JOIN = "disable_colocate_join"; + public static final String DISABLE_COLOCATE_PLAN = "disable_colocate_plan"; public static final String ENABLE_BUCKET_SHUFFLE_JOIN = "enable_bucket_shuffle_join"; public static final String PARALLEL_FRAGMENT_EXEC_INSTANCE_NUM = "parallel_fragment_exec_instance_num"; public static final String ENABLE_INSERT_STRICT = "enable_insert_strict"; @@ -235,8 +235,8 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = DISABLE_STREAMING_PREAGGREGATIONS) public boolean disableStreamPreaggregations = false; - @VariableMgr.VarAttr(name = DISABLE_COLOCATE_JOIN) - public boolean disableColocateJoin = false; + @VariableMgr.VarAttr(name = DISABLE_COLOCATE_PLAN) + public boolean disableColocatePlan = false; @VariableMgr.VarAttr(name = ENABLE_BUCKET_SHUFFLE_JOIN) public boolean enableBucketShuffleJoin = true; @@ -445,8 +445,8 @@ public void setResourceGroup(String resourceGroup) { this.resourceGroup = resourceGroup; } - public boolean isDisableColocateJoin() { - return disableColocateJoin; + public boolean isDisableColocatePlan() { + return disableColocatePlan; } public boolean isEnableBucketShuffleJoin() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java index 470dfb525b1f37..2a2ed2b55c4d47 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java @@ -23,6 +23,7 @@ import org.apache.doris.cluster.Cluster; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; +import org.apache.doris.common.FeConstants; import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.Pair; import org.apache.doris.common.Status; @@ -31,10 +32,6 @@ import org.apache.doris.thrift.TStatusCode; import org.apache.doris.thrift.TStorageMedium; -import org.apache.commons.validator.routines.InetAddressValidator; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.collect.ImmutableMap; @@ -44,6 +41,10 @@ import com.google.common.collect.Multimap; import com.google.common.collect.Sets; +import org.apache.commons.validator.routines.InetAddressValidator; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; @@ -772,7 +773,7 @@ public synchronized List seqChooseBackendIds(int backendNum, boolean needA // host -> BE list Map> backendMaps = Maps.newHashMap(); for (Backend backend : srcBackends) { - if (backendMaps.containsKey(backend.getHost())){ + if (backendMaps.containsKey(backend.getHost())) { backendMaps.get(backend.getHost()).add(backend); } else { List list = Lists.newArrayList(); @@ -781,11 +782,16 @@ public synchronized List seqChooseBackendIds(int backendNum, boolean needA } } + // if more than one backend exists in same host, select a backend at random List backends = Lists.newArrayList(); for (List list : backendMaps.values()) { - Collections.shuffle(list); - backends.add(list.get(0)); + if (FeConstants.runningUnitTest) { + backends.addAll(list); + } else { + Collections.shuffle(list); + backends.add(list.get(0)); + } } Collections.shuffle(backends); diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/ColocateTableBalancerTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/ColocateTableCheckerAndBalancerTest.java similarity index 99% rename from fe/fe-core/src/test/java/org/apache/doris/clone/ColocateTableBalancerTest.java rename to fe/fe-core/src/test/java/org/apache/doris/clone/ColocateTableCheckerAndBalancerTest.java index 0aae3d0350d6cb..3b5695affd328e 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/ColocateTableBalancerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/ColocateTableCheckerAndBalancerTest.java @@ -45,8 +45,8 @@ import java.util.Map; import java.util.Set; -public class ColocateTableBalancerTest { - private ColocateTableBalancer balancer = ColocateTableBalancer.getInstance(); +public class ColocateTableCheckerAndBalancerTest { + private ColocateTableCheckerAndBalancer balancer = ColocateTableCheckerAndBalancer.getInstance(); private Backend backend1; private Backend backend2; diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/ColocatePlanTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/ColocatePlanTest.java new file mode 100644 index 00000000000000..b9f268a9b4c6ca --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/ColocatePlanTest.java @@ -0,0 +1,113 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner; + +import org.apache.doris.analysis.CreateDbStmt; +import org.apache.doris.analysis.CreateTableStmt; +import org.apache.doris.catalog.Catalog; +import org.apache.doris.common.FeConstants; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.utframe.UtFrameUtils; + +import org.apache.commons.lang.StringUtils; + +import java.util.UUID; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class ColocatePlanTest { + private static final String COLOCATE_ENABLE = "colocate: true"; + private static String runningDir = "fe/mocked/DemoTest/" + UUID.randomUUID().toString() + "/"; + private static ConnectContext ctx; + + @Before + public void setUp() throws Exception { + FeConstants.runningUnitTest = true; + UtFrameUtils.createMinDorisCluster(runningDir, 2); + ctx = UtFrameUtils.createDefaultCtx(); + String createDbStmtStr = "create database db1;"; + CreateDbStmt createDbStmt = (CreateDbStmt) UtFrameUtils.parseAndAnalyzeStmt(createDbStmtStr, ctx); + Catalog.getCurrentCatalog().createDb(createDbStmt); + // create table test_colocate (k1 int ,k2 int, k3 int, k4 int) + // distributed by hash(k1, k2) buckets 10 + // properties ("replication_num" = "2"); + String createTblStmtStr = "create table db1.test_colocate(k1 int, k2 int, k3 int, k4 int) " + + "distributed by hash(k1, k2) buckets 10 properties('replication_num' = '2');"; + CreateTableStmt createTableStmt = (CreateTableStmt) UtFrameUtils.parseAndAnalyzeStmt(createTblStmtStr, ctx); + Catalog.getCurrentCatalog().createTable(createTableStmt); + } + + // without + // 1. agg: group by column < distributed columns + // 2. join: src data has been redistributed + @Test + public void sqlDistributedSmallerThanData1() throws Exception { + String sql = "explain select * from (select k1 from db1.test_colocate group by k1) a , db1.test_colocate b " + + "where a.k1=b.k1"; + String plan1 = UtFrameUtils.getSQLPlanOrErrorMsg(ctx, sql); + Assert.assertEquals(2, StringUtils.countMatches(plan1, "AGGREGATE")); + Assert.assertTrue(plan1.contains(DistributedPlanColocateRule.REDISTRIBUTED_SRC_DATA)); + } + + // without : join column < distributed columns; + @Test + public void sqlDistributedSmallerThanData2() throws Exception { + String sql = "explain select * from (select k1 from db1.test_colocate group by k1, k2) a , db1.test_colocate b " + + "where a.k1=b.k1"; + String plan1 = UtFrameUtils.getSQLPlanOrErrorMsg(ctx, sql); + Assert.assertTrue(plan1.contains(DistributedPlanColocateRule.INCONSISTENT_DISTRIBUTION_OF_TABLE_AND_QUERY)); + } + + // with: + // 1. agg columns = distributed columns + // 2. hash columns = agg output columns = distributed columns + @Test + public void sqlAggAndJoinSameAsTableMeta() throws Exception { + String sql = "explain select * from (select k1, k2 from db1.test_colocate group by k1, k2) a , db1.test_colocate b " + + "where a.k1=b.k1 and a.k2=b.k2"; + String plan1 = UtFrameUtils.getSQLPlanOrErrorMsg(ctx, sql); + Assert.assertEquals(1, StringUtils.countMatches(plan1, "AGGREGATE")); + Assert.assertTrue(plan1.contains(COLOCATE_ENABLE)); + } + + // with: + // 1. agg columns > distributed columns + // 2. hash columns = agg output columns > distributed columns + @Test + public void sqlAggAndJoinMoreThanTableMeta() throws Exception { + String sql = "explain select * from (select k1, k2, k3 from db1.test_colocate group by k1, k2, k3) a , " + + "db1.test_colocate b where a.k1=b.k1 and a.k2=b.k2 and a.k3=b.k3"; + String plan1 = UtFrameUtils.getSQLPlanOrErrorMsg(ctx, sql); + Assert.assertEquals(1, StringUtils.countMatches(plan1, "AGGREGATE")); + Assert.assertTrue(plan1.contains(COLOCATE_ENABLE)); + } + + // with: + // 1. agg columns > distributed columns + // 2. hash columns = distributed columns + @Test + public void sqlAggMoreThanTableMeta() throws Exception { + String sql = "explain select * from (select k1, k2, k3 from db1.test_colocate group by k1, k2, k3) a , " + + "db1.test_colocate b where a.k1=b.k1 and a.k2=b.k2"; + String plan1 = UtFrameUtils.getSQLPlanOrErrorMsg(ctx, sql); + Assert.assertEquals(1, StringUtils.countMatches(plan1, "AGGREGATE")); + Assert.assertTrue(plan1.contains(COLOCATE_ENABLE)); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java index ab34e204893ae2..7b4f6de3f6ad92 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java @@ -150,10 +150,22 @@ public static int startFEServer(String runningDir) throws EnvVarNotSetException, return fe_rpc_port; } - public static void createMinDorisCluster(String runningDir) throws EnvVarNotSetException, IOException, + public static void createMinDorisCluster(String runningDir) throws InterruptedException, NotInitException, + IOException, DdlException, EnvVarNotSetException, FeStartException { + createMinDorisCluster(runningDir, 1); + } + + public static void createMinDorisCluster(String runningDir, int backendNum) throws EnvVarNotSetException, IOException, FeStartException, NotInitException, DdlException, InterruptedException { int fe_rpc_port = startFEServer(runningDir); + for (int i = 0; i < backendNum; i++) { + createBackend(fe_rpc_port); + // sleep to wait first heartbeat + Thread.sleep(6000); + } + } + public static void createBackend(int fe_rpc_port) throws IOException, InterruptedException { int be_heartbeat_port = findValidPort(); int be_thrift_port = findValidPort(); int be_brpc_port = findValidPort(); @@ -168,9 +180,9 @@ public static void createMinDorisCluster(String runningDir) throws EnvVarNotSetE backend.start(); // add be - Backend be = new Backend(10001, backend.getHost(), backend.getHeartbeatPort()); + Backend be = new Backend(Catalog.getCurrentCatalog().getNextId(), backend.getHost(), backend.getHeartbeatPort()); Map disks = Maps.newHashMap(); - DiskInfo diskInfo1 = new DiskInfo("/path1"); + DiskInfo diskInfo1 = new DiskInfo("/path" + be.getId()); diskInfo1.setTotalCapacityB(1000000); diskInfo1.setAvailableCapacityB(500000); diskInfo1.setDataUsedCapacityB(480000); @@ -179,9 +191,6 @@ public static void createMinDorisCluster(String runningDir) throws EnvVarNotSetE be.setAlive(true); be.setOwnerClusterName(SystemInfoService.DEFAULT_CLUSTER); Catalog.getCurrentSystemInfo().addBackend(be); - - // sleep to wait first heartbeat - Thread.sleep(6000); } public static void cleanDorisFeDir(String baseDir) { From a98b6a890a15ae096d1fbb053dacb9d6609b54fe Mon Sep 17 00:00:00 2001 From: EmmyMiao87 <522274284@qq.com> Date: Mon, 22 Mar 2021 11:20:57 +0800 Subject: [PATCH 2/6] Changed --- .../apache/doris/planner/DistributedPlanner.java | 10 ++++------ .../java/org/apache/doris/planner/PlanNode.java | 13 ++++--------- 2 files changed, 8 insertions(+), 15 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java index 592974c8511e0a..4f487efad5deac 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java @@ -543,10 +543,8 @@ private OlapScanNode genSrcScanNode(Expr expr, PlanFragment planFragment, List, List> scanNodeWithJoinConjuncts, - List cannotReason) { - // If left table and right table is same table and they select same single partition or no partition - // they are naturally colocate relationship no need to check colocate group + private boolean dataDistributionMatchEqPredicate(Map, + List> scanNodeWithJoinConjuncts, List cannotReason) { for (Map.Entry, List> entry : scanNodeWithJoinConjuncts.entrySet()) { OlapScanNode leftScanNode = entry.getKey().first; OlapScanNode rightScanNode = entry.getKey().second; @@ -571,8 +569,8 @@ private boolean dataDistributionMatchEqPredicate(List eqJoinPre // they are naturally colocate relationship no need to check colocate group Collection leftPartitions = leftRoot.getSelectedPartitionIds(); Collection rightPartitions = rightRoot.getSelectedPartitionIds(); - boolean noNeedCheckColocateGroup = (leftTable.getId() == rightTable.getId()) && (leftPartitions.equals(rightPartitions)) && - (leftPartitions.size() <= 1); + boolean noNeedCheckColocateGroup = (leftTable.getId() == rightTable.getId()) + && (leftPartitions.equals(rightPartitions)) && (leftPartitions.size() <= 1); if (!noNeedCheckColocateGroup) { ColocateTableIndex colocateIndex = Catalog.getCurrentColocateIndex(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java index f263c458655b41..b0c33da2a158af 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java @@ -647,21 +647,16 @@ public String toString() { } public ScanNode getScanNodeInOneFragmentByTupleId(TupleId tupleId) { - if (this instanceof ScanNode) { - if (tupleIds.contains(tupleId)) { - return (ScanNode) this; - } - return null; - } else if (this instanceof ExchangeNode) { - return null; - } else { + if (this instanceof ScanNode && tupleIds.contains(tupleId)) { + return (ScanNode) this; + } else if (!(this instanceof ExchangeNode)) { for (PlanNode planNode : children) { ScanNode scanNode = planNode.getScanNodeInOneFragmentByTupleId(tupleId); if (scanNode != null) { return scanNode; } } - return null; } + return null; } } From f251b38bcb3bee1bee25fb0326ad1f6b3006798c Mon Sep 17 00:00:00 2001 From: EmmyMiao87 <522274284@qq.com> Date: Tue, 23 Mar 2021 12:39:04 +0800 Subject: [PATCH 3/6] fix unit test --- .../java/org/apache/doris/analysis/Expr.java | 2 +- .../org/apache/doris/analysis/ExprTest.java | 28 +++++++++++++++---- .../doris/catalog/ColocateTableTest.java | 3 +- .../apache/doris/planner/QueryPlanTest.java | 2 +- 4 files changed, 26 insertions(+), 9 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java index 920a54fdc65be0..662f316d6f2f5b 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java @@ -1348,7 +1348,7 @@ public SlotRef getSrcSlotRef() { } List sourceExpr = slotDescriptor.getSourceExprs(); if (sourceExpr == null || sourceExpr.isEmpty()) { - return (SlotRef) this; + return unwrapSloRef; } if (sourceExpr.size() > 1) { return null; diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/ExprTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/ExprTest.java index 360a5cfc45e2b3..7904b924859fb2 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/ExprTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/ExprTest.java @@ -24,17 +24,16 @@ import com.google.common.collect.Maps; -import org.junit.Assert; -import org.junit.Test; - +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.Set; -import java.util.List; -import java.util.ArrayList; import mockit.Expectations; import mockit.Injectable; import mockit.Mocked; +import org.junit.Assert; +import org.junit.Test; public class ExprTest { @@ -184,4 +183,23 @@ public void testEqualSets() { list2.add(r4); Assert.assertFalse(Expr.equalSets(list1, list2)); } + + @Test + public void testSrcSlotRef(@Injectable SlotDescriptor slotDescriptor) { + TableName tableName = new TableName("db1", "table1"); + SlotRef slotRef = new SlotRef(tableName, "c1"); + slotRef.setDesc(slotDescriptor); + Deencapsulation.setField(slotRef, "isAnalyzed", true); + Expr castExpr = new CastExpr(new TypeDef(Type.INT), slotRef); + new Expectations() { + { + slotDescriptor.getSourceExprs(); + result = null; + } + }; + + SlotRef srcSlotRef = castExpr.getSrcSlotRef(); + Assert.assertTrue(srcSlotRef != null); + Assert.assertTrue(srcSlotRef == slotRef); + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/ColocateTableTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/ColocateTableTest.java index 483832e0ff5e3c..3accf552267889 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/ColocateTableTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/ColocateTableTest.java @@ -121,8 +121,7 @@ public void testCreateOneTable() throws Exception { GroupId groupId = index.getGroup(tableId); List backendIds = index.getBackendsPerBucketSeq(groupId).get(0); - System.out.println(backendIds); - Assert.assertEquals(Collections.singletonList(10001L), backendIds); + Assert.assertEquals(1, backendIds.size()); String fullGroupName = dbId + "_" + groupName; Assert.assertEquals(tableId, index.getTableIdByGroup(fullGroupName)); diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java index 613aa300cf0b8a..c6bd97ca943634 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java @@ -1286,7 +1286,7 @@ public void testOdbcSink() throws Exception { @Test public void testPreferBroadcastJoin() throws Exception { connectContext.setDatabase("default_cluster:test"); - String queryStr = "explain select * from (select k1 from jointest group by k1)t2, jointest t1 where t1.k1 = t2.k1"; + String queryStr = "explain select * from (select k2 from jointest group by k2)t2, jointest t1 where t1.k1 = t2.k2"; // default set PreferBroadcastJoin true String explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr); From 44a8e1477baa959f34614103ac091d42e4f8f26a Mon Sep 17 00:00:00 2001 From: EmmyMiao87 <522274284@qq.com> Date: Wed, 24 Mar 2021 11:14:56 +0800 Subject: [PATCH 4/6] Fix ut --- .../doris/planner/ColocatePlanTest.java | 15 ++++-- .../apache/doris/utframe/AnotherDemoTest.java | 46 +------------------ 2 files changed, 14 insertions(+), 47 deletions(-) diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/ColocatePlanTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/ColocatePlanTest.java index b9f268a9b4c6ca..8ff49e32364635 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/ColocatePlanTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/ColocatePlanTest.java @@ -26,10 +26,12 @@ import org.apache.commons.lang.StringUtils; +import java.io.File; import java.util.UUID; +import org.junit.AfterClass; import org.junit.Assert; -import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; public class ColocatePlanTest { @@ -37,8 +39,8 @@ public class ColocatePlanTest { private static String runningDir = "fe/mocked/DemoTest/" + UUID.randomUUID().toString() + "/"; private static ConnectContext ctx; - @Before - public void setUp() throws Exception { + @BeforeClass + public static void setUp() throws Exception { FeConstants.runningUnitTest = true; UtFrameUtils.createMinDorisCluster(runningDir, 2); ctx = UtFrameUtils.createDefaultCtx(); @@ -54,6 +56,13 @@ public void setUp() throws Exception { Catalog.getCurrentCatalog().createTable(createTableStmt); } + + @AfterClass + public static void tearDown() { + File file = new File(runningDir); + file.delete(); + } + // without // 1. agg: group by column < distributed columns // 2. join: src data has been redistributed diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/AnotherDemoTest.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/AnotherDemoTest.java index 5ea6919fea8ddb..a84a2e53b70eb1 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/AnotherDemoTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/AnotherDemoTest.java @@ -82,54 +82,12 @@ public class AnotherDemoTest { public static void beforeClass() throws EnvVarNotSetException, IOException, FeStartException, NotInitException, DdlException, InterruptedException { FeConstants.default_scheduler_interval_millisecond = 10; - // get DORIS_HOME - String dorisHome = System.getenv("DORIS_HOME"); - if (Strings.isNullOrEmpty(dorisHome)) { - dorisHome = Files.createTempDirectory("DORIS_HOME").toAbsolutePath().toString(); - } - - getPorts(); - - // start fe in "DORIS_HOME/fe/mocked/" - MockedFrontend frontend = MockedFrontend.getInstance(); - Map feConfMap = Maps.newHashMap(); - // set additional fe config - feConfMap.put("http_port", String.valueOf(fe_http_port)); - feConfMap.put("rpc_port", String.valueOf(fe_rpc_port)); - feConfMap.put("query_port", String.valueOf(fe_query_port)); - feConfMap.put("edit_log_port", String.valueOf(fe_edit_log_port)); - feConfMap.put("tablet_create_timeout_second", "10"); - frontend.init(dorisHome + "/" + runningDir, feConfMap); - frontend.start(new String[0]); - - // start be - MockedBackend backend = MockedBackendFactory.createBackend("127.0.0.1", - be_heartbeat_port, be_thrift_port, be_brpc_port, be_http_port, - new DefaultHeartbeatServiceImpl(be_thrift_port, be_http_port, be_brpc_port), - new DefaultBeThriftServiceImpl(), new DefaultPBackendServiceImpl()); - backend.setFeAddress(new TNetworkAddress("127.0.0.1", frontend.getRpcPort())); - backend.start(); - - // add be - Backend be = new Backend(10001, backend.getHost(), backend.getHeartbeatPort()); - Map disks = Maps.newHashMap(); - DiskInfo diskInfo1 = new DiskInfo("/path1"); - diskInfo1.setTotalCapacityB(1000000); - diskInfo1.setAvailableCapacityB(500000); - diskInfo1.setDataUsedCapacityB(480000); - disks.put(diskInfo1.getRootPath(), diskInfo1); - be.setDisks(ImmutableMap.copyOf(disks)); - be.setAlive(true); - be.setOwnerClusterName(SystemInfoService.DEFAULT_CLUSTER); - Catalog.getCurrentSystemInfo().addBackend(be); - - // sleep to wait first heartbeat - Thread.sleep(6000); + UtFrameUtils.createMinDorisCluster(runningDir, 1); } @AfterClass public static void TearDown() { - UtFrameUtils.cleanDorisFeDir(runningDirBase); + UtFrameUtils.cleanDorisFeDir(runningDir); } // generate all port from valid ports From 0d817bbf1933456926a27776aaf82849e706e3bc Mon Sep 17 00:00:00 2001 From: EmmyMiao87 <522274284@qq.com> Date: Thu, 25 Mar 2021 14:59:41 +0800 Subject: [PATCH 5/6] Add comment --- .../src/main/java/org/apache/doris/analysis/Expr.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java index 662f316d6f2f5b..1929d9522d2142 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java @@ -1337,6 +1337,14 @@ public String toString() { selectivity).add("#distinct", numDistinctValues).add("scale", outputScale).toString(); } + /** + * This method is mainly used to find the original column corresponding to the current expr. + * Find the initial slotRef from the current slot ref. + * + * If the initial expr is not a slotRef, it returns null directly. + * If the current slotRef comes from another expression transformation, + * rather than directly from another slotRef, null will also be returned. + */ public SlotRef getSrcSlotRef() { SlotRef unwrapSloRef = this.unwrapSlotRef(); if (unwrapSloRef == null) { From 12e1b8b64620a3214181e9e70e6b9f1894ead65c Mon Sep 17 00:00:00 2001 From: EmmyMiao87 <522274284@qq.com> Date: Thu, 8 Apr 2021 12:20:23 +0800 Subject: [PATCH 6/6] Remove todo --- .../java/org/apache/doris/planner/DistributedPlanner.java | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java index 4f487efad5deac..9aeaba9b2f7214 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java @@ -312,7 +312,6 @@ private PlanFragment createHashJoinFragment(HashJoinNode node, PlanFragment righ List reason = Lists.newArrayList(); if (canColocateJoin(node, leftChildFragment, rightChildFragment, reason)) { node.setColocate(true, ""); - //node.setDistributionMode(HashJoinNode.DistributionMode.PARTITIONED); node.setChild(0, leftChildFragment.getPlanRoot()); node.setChild(1, rightChildFragment.getPlanRoot()); leftChildFragment.setPlanRoot(node); @@ -476,7 +475,7 @@ && isBroadcastCostSmaller(broadcastCost, partitionCost) /** * Colocate Join can be performed when the following 4 conditions are met at the same time. - * 1. Session variables disable_colocate_plan = true + * 1. Session variables disable_colocate_plan = false * 2. There is no join hints in HashJoinNode * 3. There are no exchange node between source scan node and HashJoinNode. * 4. The scan nodes which are related by EqConjuncts in HashJoinNode are colocate and group can be matched. @@ -536,7 +535,7 @@ private OlapScanNode genSrcScanNode(Expr expr, PlanFragment planFragment, List