From 0e4765cbe0b8c83036292ed4c936de241fc56caa Mon Sep 17 00:00:00 2001 From: EmmyMiao87 <522274284@qq.com> Date: Thu, 11 Mar 2021 20:14:59 +0800 Subject: [PATCH 1/3] [Colocate plan][Step1] Colocate join covers more situations The old colocate join can only cover the case where the child is hash or scan. In fact, as long as the child's data distribution meets the requirements, no matter what the plan node on the child node is, a colocate join can be performed. --- .../apache/doris/planner/DistributedPlanner.java | 13 +++++++++++-- .../java/org/apache/doris/planner/PlanNode.java | 2 +- .../org/apache/doris/planner/ColocatePlanTest.java | 1 - 3 files changed, 12 insertions(+), 4 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java index 9aeaba9b2f7214..aac228c11e95e0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java @@ -475,7 +475,11 @@ && isBroadcastCostSmaller(broadcastCost, partitionCost) /** * Colocate Join can be performed when the following 4 conditions are met at the same time. +<<<<<<< HEAD * 1. Session variables disable_colocate_plan = false +======= + * 1. Session variables disable_colocate_plan = true +>>>>>>> 26f4e1fa8 ([Colocate plan][Step1] Colocate join covers more situations) * 2. There is no join hints in HashJoinNode * 3. There are no exchange node between source scan node and HashJoinNode. * 4. The scan nodes which are related by EqConjuncts in HashJoinNode are colocate and group can be matched. @@ -542,8 +546,10 @@ private OlapScanNode genSrcScanNode(Expr expr, PlanFragment planFragment, List, - List> scanNodeWithJoinConjuncts, List cannotReason) { + private boolean dataDistributionMatchEqPredicate(Map, List> scanNodeWithJoinConjuncts, + List cannotReason) { + // If left table and right table is same table and they select same single partition or no partition + // they are naturally colocate relationship no need to check colocate group for (Map.Entry, List> entry : scanNodeWithJoinConjuncts.entrySet()) { OlapScanNode leftScanNode = entry.getKey().first; OlapScanNode rightScanNode = entry.getKey().second; @@ -842,6 +848,7 @@ private PlanFragment createSetOperationNodeFragment( } // There is at least one partitioned child fragment. + // TODO(ML): here PlanFragment setOperationFragment = new PlanFragment(ctx_.getNextFragmentId(), setOperationNode, DataPartition.RANDOM); for (int i = 0; i < childFragments.size(); ++i) { @@ -964,6 +971,7 @@ private PlanFragment createAggregationFragment( } else { // Check table's distribution. See #4481. PlanNode childPlan = childFragment.getPlanRoot(); + // TODO(ML): here if (childPlan instanceof OlapScanNode && ((OlapScanNode) childPlan).getOlapTable().meetAggDistributionRequirements(node.getAggInfo())) { childFragment.addPlanRoot(node); @@ -1204,6 +1212,7 @@ private PlanFragment createAnalyticFragment( // required if the sort partition exprs reference a tuple that is made nullable in // 'childFragment' to bring NULLs from outer-join non-matches together. DataPartition sortPartition = sortNode.getInputPartition(); + // TODO(ML): here if (!childFragment.getDataPartition().equals(sortPartition)) { // TODO(zc) || childFragment.refsNullableTupleId(sortPartition.getPartitionExprs())) { analyticFragment = createParentFragment(childFragment, sortNode.getInputPartition()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java index b0c33da2a158af..dd7f6363984b5e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java @@ -645,7 +645,7 @@ public String toString() { sb.append("\n").append(getNodeExplainString("", TExplainLevel.BRIEF)); return sb.toString(); } - + public ScanNode getScanNodeInOneFragmentByTupleId(TupleId tupleId) { if (this instanceof ScanNode && tupleIds.contains(tupleId)) { return (ScanNode) this; diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/ColocatePlanTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/ColocatePlanTest.java index 8ff49e32364635..5c28b224f3d042 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/ColocatePlanTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/ColocatePlanTest.java @@ -56,7 +56,6 @@ public static void setUp() throws Exception { Catalog.getCurrentCatalog().createTable(createTableStmt); } - @AfterClass public static void tearDown() { File file = new File(runningDir); From 5d161a9568a8c1c17b0d4ca94c60940380d39f75 Mon Sep 17 00:00:00 2001 From: EmmyMiao87 <522274284@qq.com> Date: Wed, 17 Mar 2021 15:01:56 +0800 Subject: [PATCH 2/3] [Colocate plan][Step2] Colocate aggregation covers more situations The old colocate aggregation can only cover the case where the child is scan. In fact, as long as the child's data distribution meets the requirements, no matter what the plan node on the child node is, a colocate aggregation can be performed. This PR also fixes the correct data partition attribute of fragment. The data partition of fragment which contains scan node is Hash Partition rather than Random. This modification is mainly to determine the possibility of colocate through the correct distribution of child fragments. --- .../apache/doris/analysis/AggregateInfo.java | 12 ++- .../java/org/apache/doris/analysis/Expr.java | 21 +++++ .../org/apache/doris/analysis/SlotRef.java | 37 +++++++++ .../apache/doris/planner/DataPartition.java | 4 +- .../doris/planner/DistributedPlanner.java | 76 +++++++++++++++++-- .../apache/doris/planner/OlapScanNode.java | 14 +++- .../apache/doris/planner/PlanFragment.java | 32 +++++++- .../doris/planner/OlapScanNodeTest.java | 34 ++++++--- 8 files changed, 208 insertions(+), 22 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfo.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfo.java index 04b047433bd1e0..3e14d8964f6b38 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfo.java @@ -807,8 +807,16 @@ public String debugString() { } @Override - protected String tupleDebugName() { return "agg-tuple"; } + protected String tupleDebugName() { + return "agg-tuple"; + } @Override - public AggregateInfo clone() { return new AggregateInfo(this); } + public AggregateInfo clone() { + return new AggregateInfo(this); + } + + public List getInputPartitionExprs() { + return partitionExprs_ != null ? partitionExprs_ : groupingExprs_; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java index 1929d9522d2142..3f73a31c8dc652 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java @@ -1364,6 +1364,27 @@ public SlotRef getSrcSlotRef() { return sourceExpr.get(0).getSrcSlotRef(); } + public boolean comeFrom(Expr srcExpr) { + SlotRef unwrapSloRef = this.unwrapSlotRef(); + if (unwrapSloRef == null) { + return false; + } + SlotRef unwrapSrcSlotRef = srcExpr.unwrapSlotRef(); + if (unwrapSrcSlotRef == null) { + return false; + } + if (unwrapSloRef.columnEqual(unwrapSrcSlotRef)) { + return true; + } + // check source expr + SlotDescriptor slotDescriptor = unwrapSloRef.getDesc(); + if (slotDescriptor == null || slotDescriptor.getSourceExprs() == null + || slotDescriptor.getSourceExprs().size() != 1) { + return false; + } + return slotDescriptor.getSourceExprs().get(0).comeFrom(unwrapSrcSlotRef); + } + /** * If 'this' is a SlotRef or a Cast that wraps a SlotRef, returns that SlotRef. * Otherwise returns null. diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotRef.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotRef.java index 57c492f44e7a0d..cd52ef6de07808 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotRef.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotRef.java @@ -123,6 +123,43 @@ public void setDesc(SlotDescriptor desc) { this.desc = desc; } + public boolean columnEqual(Expr srcExpr) { + Preconditions.checkState(srcExpr instanceof SlotRef); + SlotRef srcSlotRef = (SlotRef) srcExpr; + if (desc != null && srcSlotRef.desc != null) { + return desc.getId().equals(srcSlotRef.desc.getId()); + } + TableName srcTableName = srcSlotRef.tblName; + if (srcTableName == null && srcSlotRef.desc != null) { + srcTableName = srcSlotRef.getTableName(); + } + TableName thisTableName = tblName; + if (thisTableName == null && desc != null) { + thisTableName = getTableName(); + } + if ((thisTableName == null) != (srcTableName == null)) { + return false; + } + if (thisTableName != null && !thisTableName.equals(srcTableName)) { + return false; + } + String srcColumnName = srcSlotRef.getColumnName(); + if (srcColumnName == null && srcSlotRef.desc != null && srcSlotRef.getDesc().getColumn() != null) { + srcColumnName = srcSlotRef.desc.getColumn().getName(); + } + String thisColumnName = getColumnName(); + if (thisColumnName == null && desc != null && desc.getColumn() != null) { + thisColumnName = desc.getColumn().getName(); + } + if ((thisColumnName == null) != (srcColumnName == null)) { + return false; + } + if (thisColumnName != null && !thisColumnName.toLowerCase().equals(srcColumnName.toLowerCase())) { + return false; + } + return true; + } + @Override public void vectorizedAnalyze(Analyzer analyzer) { computeOutputColumn(analyzer); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java index 1a293abdbb054a..ec474562a7a506 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java @@ -24,12 +24,14 @@ import org.apache.doris.thrift.TDataPartition; import org.apache.doris.thrift.TExplainLevel; import org.apache.doris.thrift.TPartitionType; + import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; -import org.apache.logging.log4j.Logger; + import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.util.List; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java index aac228c11e95e0..344a8106df873a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java @@ -275,10 +275,15 @@ private PlanFragment createMergeFragment(PlanFragment inputFragment) private PlanFragment createScanFragment(PlanNode node) { if (node instanceof MysqlScanNode || node instanceof OdbcScanNode) { return new PlanFragment(ctx_.getNextFragmentId(), node, DataPartition.UNPARTITIONED); - } else if (node instanceof SchemaScanNode) { + } else if (node instanceof SchemaScanNode) { return new PlanFragment(ctx_.getNextFragmentId(), node, DataPartition.UNPARTITIONED); + } else if (node instanceof OlapScanNode) { + // olap scan node + OlapScanNode olapScanNode = (OlapScanNode) node; + return new PlanFragment(ctx_.getNextFragmentId(), node, + olapScanNode.constructInputPartitionByDistributionInfo(), DataPartition.RANDOM); } else { - // es scan node, olap scan node are random partitioned + // other scan nodes are random partitioned: es, broker return new PlanFragment(ctx_.getNextFragmentId(), node, DataPartition.RANDOM); } } @@ -850,7 +855,8 @@ private PlanFragment createSetOperationNodeFragment( // There is at least one partitioned child fragment. // TODO(ML): here PlanFragment setOperationFragment = new PlanFragment(ctx_.getNextFragmentId(), setOperationNode, - DataPartition.RANDOM); + new DataPartition(TPartitionType.HASH_PARTITIONED, + setOperationNode.getMaterializedResultExprLists_().get(0))); for (int i = 0; i < childFragments.size(); ++i) { PlanFragment childFragment = childFragments.get(i); /* if (childFragment.isPartitioned() && childFragment.getPlanRoot().getNumInstances() > 1) { @@ -969,11 +975,7 @@ private PlanFragment createAggregationFragment( if (isDistinct) { return createPhase2DistinctAggregationFragment(node, childFragment, fragments); } else { - // Check table's distribution. See #4481. - PlanNode childPlan = childFragment.getPlanRoot(); - // TODO(ML): here - if (childPlan instanceof OlapScanNode && - ((OlapScanNode) childPlan).getOlapTable().meetAggDistributionRequirements(node.getAggInfo())) { + if (canColocateAgg(node.getAggInfo(), childFragment.getInputDataPartition())) { childFragment.addPlanRoot(node); return childFragment; } else { @@ -982,6 +984,64 @@ private PlanFragment createAggregationFragment( } } + /** + * Colocate Agg can be performed when the following 2 conditions are met at the same time. + * 1. Session variables disable_colocate_plan = true + * 2. The input data partition of child fragment < agg node partition exprs + */ + private boolean canColocateAgg(AggregateInfo aggregateInfo, List childFragmentDataPartition) { + // Condition1 + if (ConnectContext.get().getSessionVariable().isDisableColocatePlan()) { + LOG.info("Agg node is not colocate in:" + ConnectContext.get().getQueryDetail().getQueryId() + + ", reason:" + DistributedPlanColocateRule.SESSION_DISABLED); + return false; + } + + // Condition2 + List aggPartitionExprs = aggregateInfo.getInputPartitionExprs(); + for (DataPartition childDataPartition : childFragmentDataPartition) { + if (dataPartitionMatchAggInfo(childDataPartition, aggPartitionExprs)) { + return true; + } + } + return false; + } + + /** + * The aggPartitionExprs should contains all of data partition columns. + * Since aggPartitionExprs may be derived from the transformation of the lower tuple, + * it is necessary to find the source expr of itself firstly. + *

+ * For example: + * Data Partition: t1.k1, t1.k2 + * Agg Partition Exprs: t1.k1, t1.k2, t1.k3 + * Return: true + *

+ * Data Partition: t1.k1, t1.k2 + * Agg Partition Exprs: t1.k1, t2.k2 + * Return: false + */ + private boolean dataPartitionMatchAggInfo(DataPartition dataPartition, List aggPartitionExprs) { + TPartitionType partitionType = dataPartition.getType(); + if (partitionType != TPartitionType.HASH_PARTITIONED) { + return false; + } + List dataPartitionExprs = dataPartition.getPartitionExprs(); + for (Expr dataPartitionExpr : dataPartitionExprs) { + boolean match = false; + for (Expr aggPartitionExpr : aggPartitionExprs) { + if (aggPartitionExpr.comeFrom(dataPartitionExpr)) { + match = true; + break; + } + } + if (!match) { + return false; + } + } + return true; + } + private PlanFragment createRepeatNodeFragment( RepeatNode repeatNode, PlanFragment childFragment, ArrayList fragments) throws UserException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index 4d260ae6a238e2..6b1980f4abf57c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -775,7 +775,7 @@ private boolean isSlotRefNested(Expr expr) { return expr instanceof SlotRef; } - private void filterDeletedRows(Analyzer analyzer) throws AnalysisException{ + private void filterDeletedRows(Analyzer analyzer) throws AnalysisException { if (!Util.showHiddenColumns() && olapTable.hasDeleteSign()) { SlotRef deleteSignSlot = new SlotRef(desc.getAliasAsName(), Column.DELETE_SIGN); deleteSignSlot.analyze(analyzer); @@ -785,4 +785,16 @@ private void filterDeletedRows(Analyzer analyzer) throws AnalysisException{ conjuncts.add(conjunct); } } + + public DataPartition constructInputPartitionByDistributionInfo() { + DistributionInfo distributionInfo = olapTable.getDefaultDistributionInfo(); + Preconditions.checkState(distributionInfo instanceof HashDistributionInfo); + List distributeColumns = ((HashDistributionInfo) distributionInfo).getDistributionColumns(); + List dataDistributeExprs = Lists.newArrayList(); + for (Column column : distributeColumns) { + SlotRef slotRef = new SlotRef(desc.getRef().getName(), column.getName()); + dataDistributeExprs.add(slotRef); + } + return DataPartition.hashPartitioned(dataDistributeExprs); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java index 27ada66860a9e3..234e7d3bd8fe2d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java @@ -29,6 +29,7 @@ import org.apache.doris.thrift.TResultSinkType; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; import org.apache.commons.collections.CollectionUtils; import org.apache.logging.log4j.LogManager; @@ -97,6 +98,17 @@ public class PlanFragment extends TreeNode { // TODO: improve this comment, "input" is a bit misleading private final DataPartition dataPartition; + // specification of the actually input partition of this fragment when transmitting to be. + // By default, the value of the data partition in planner and the data partition transmitted to be are the same. + // So this attribute is empty. + // But sometimes the planned value and the serialized value are inconsistent. You need to set this value. + // At present, this situation only occurs in the fragment where the scan node is located. + // Since the data partition expression of the scan node is actually constructed from the schema of the table, + // the expression is not analyzed. + // This will cause this expression to not be serialized correctly and transmitted to be. + // In this case, you need to set this attribute to DataPartition RANDOM to avoid the problem. + private DataPartition dataPartitionForThrift; + // specification of how the output of this fragment is partitioned (i.e., how // it's sent to its destination); // if the output is UNPARTITIONED, it is being broadcast @@ -128,6 +140,11 @@ public PlanFragment(PlanFragmentId id, PlanNode root, DataPartition partition) { setFragmentInPlanTree(planRoot); } + public PlanFragment(PlanFragmentId id, PlanNode root, DataPartition partition, DataPartition partitionForThrift) { + this(id, root, partition); + this.dataPartitionForThrift = partitionForThrift; + } + /** * Assigns 'this' as fragment of all PlanNodes in the plan tree rooted at node. * Does not traverse the children of ExchangeNodes because those must belong to a @@ -212,7 +229,11 @@ public TPlanFragment toThrift() { if (sink != null) { result.setOutputSink(sink.toThrift()); } - result.setPartition(dataPartition.toThrift()); + if (dataPartitionForThrift == null) { + result.setPartition(dataPartition.toThrift()); + } else { + result.setPartition(dataPartitionForThrift.toThrift()); + } // TODO chenhao , calculated by cost result.setMinReservationBytes(0); @@ -260,6 +281,15 @@ public void setDestination(ExchangeNode destNode) { dest.addChild(this); } + public List getInputDataPartition() { + List result = Lists.newArrayList(); + result.add(getDataPartition()); + for (PlanFragment child : children) { + result.add(child.getOutputPartition()); + } + return result; + } + public DataPartition getDataPartition() { return dataPartition; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/OlapScanNodeTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/OlapScanNodeTest.java index f9f0f9362330fd..ebe7da33c75109 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/OlapScanNodeTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/OlapScanNodeTest.java @@ -17,14 +17,6 @@ package org.apache.doris.planner; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; - -import org.junit.Assert; -import org.junit.Test; - import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.InPredicate; import org.apache.doris.analysis.IntLiteral; @@ -34,9 +26,18 @@ import org.apache.doris.catalog.PartitionKey; import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.common.AnalysisException; + import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import org.junit.Assert; +import org.junit.Test; + public class OlapScanNodeTest { // columnA in (1) hashmode=3 @Test @@ -157,6 +158,21 @@ public void testHashForIntLiteral() { long hashValue = hashKey.getHashValue(); long mod = (int) ((hashValue & 0xffffffff) % 3); Assert.assertEquals(mod, 2); - } + } } + +// @Test +// public void testConstructInputPartitionByDistributionInfo(@Injectable OlapTable olapTable, +// @Injectable TupleDescriptor tupleDescriptor) { +// PlanNodeId planNodeId = new PlanNodeId(1); +// OlapScanNode olapScanNode = new OlapScanNode(planNodeId, tupleDescriptor, "scan node"); +// Deencapsulation.setField(olapScanNode, "olapTable", olapTable); +// new Expectations() { +// { +// olapTable.getDefaultDistributionInfo(); +// result = +// } +// }; +// olapScanNode.constructInputPartitionByDistributionInfo(); +// } } From 24e411eed2784ac722e821a3e0d7613b29f5b90e Mon Sep 17 00:00:00 2001 From: EmmyMiao87 <522274284@qq.com> Date: Tue, 27 Apr 2021 18:38:32 +0800 Subject: [PATCH 3/3] Remove conflict --- .../java/org/apache/doris/planner/DistributedPlanner.java | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java index 344a8106df873a..8f61a499a7bacf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java @@ -480,11 +480,7 @@ && isBroadcastCostSmaller(broadcastCost, partitionCost) /** * Colocate Join can be performed when the following 4 conditions are met at the same time. -<<<<<<< HEAD * 1. Session variables disable_colocate_plan = false -======= - * 1. Session variables disable_colocate_plan = true ->>>>>>> 26f4e1fa8 ([Colocate plan][Step1] Colocate join covers more situations) * 2. There is no join hints in HashJoinNode * 3. There are no exchange node between source scan node and HashJoinNode. * 4. The scan nodes which are related by EqConjuncts in HashJoinNode are colocate and group can be matched. @@ -986,13 +982,13 @@ private PlanFragment createAggregationFragment( /** * Colocate Agg can be performed when the following 2 conditions are met at the same time. - * 1. Session variables disable_colocate_plan = true + * 1. Session variables disable_colocate_plan = false * 2. The input data partition of child fragment < agg node partition exprs */ private boolean canColocateAgg(AggregateInfo aggregateInfo, List childFragmentDataPartition) { // Condition1 if (ConnectContext.get().getSessionVariable().isDisableColocatePlan()) { - LOG.info("Agg node is not colocate in:" + ConnectContext.get().getQueryDetail().getQueryId() + LOG.debug("Agg node is not colocate in:" + ConnectContext.get().getQueryDetail().getQueryId() + ", reason:" + DistributedPlanColocateRule.SESSION_DISABLED); return false; }