diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfo.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfo.java index 04b047433bd1e0..3e14d8964f6b38 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfo.java @@ -807,8 +807,16 @@ public String debugString() { } @Override - protected String tupleDebugName() { return "agg-tuple"; } + protected String tupleDebugName() { + return "agg-tuple"; + } @Override - public AggregateInfo clone() { return new AggregateInfo(this); } + public AggregateInfo clone() { + return new AggregateInfo(this); + } + + public List getInputPartitionExprs() { + return partitionExprs_ != null ? partitionExprs_ : groupingExprs_; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java index 1929d9522d2142..3f73a31c8dc652 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java @@ -1364,6 +1364,27 @@ public SlotRef getSrcSlotRef() { return sourceExpr.get(0).getSrcSlotRef(); } + public boolean comeFrom(Expr srcExpr) { + SlotRef unwrapSloRef = this.unwrapSlotRef(); + if (unwrapSloRef == null) { + return false; + } + SlotRef unwrapSrcSlotRef = srcExpr.unwrapSlotRef(); + if (unwrapSrcSlotRef == null) { + return false; + } + if (unwrapSloRef.columnEqual(unwrapSrcSlotRef)) { + return true; + } + // check source expr + SlotDescriptor slotDescriptor = unwrapSloRef.getDesc(); + if (slotDescriptor == null || slotDescriptor.getSourceExprs() == null + || slotDescriptor.getSourceExprs().size() != 1) { + return false; + } + return slotDescriptor.getSourceExprs().get(0).comeFrom(unwrapSrcSlotRef); + } + /** * If 'this' is a SlotRef or a Cast that wraps a SlotRef, returns that SlotRef. * Otherwise returns null. diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotRef.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotRef.java index 57c492f44e7a0d..cd52ef6de07808 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotRef.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotRef.java @@ -123,6 +123,43 @@ public void setDesc(SlotDescriptor desc) { this.desc = desc; } + public boolean columnEqual(Expr srcExpr) { + Preconditions.checkState(srcExpr instanceof SlotRef); + SlotRef srcSlotRef = (SlotRef) srcExpr; + if (desc != null && srcSlotRef.desc != null) { + return desc.getId().equals(srcSlotRef.desc.getId()); + } + TableName srcTableName = srcSlotRef.tblName; + if (srcTableName == null && srcSlotRef.desc != null) { + srcTableName = srcSlotRef.getTableName(); + } + TableName thisTableName = tblName; + if (thisTableName == null && desc != null) { + thisTableName = getTableName(); + } + if ((thisTableName == null) != (srcTableName == null)) { + return false; + } + if (thisTableName != null && !thisTableName.equals(srcTableName)) { + return false; + } + String srcColumnName = srcSlotRef.getColumnName(); + if (srcColumnName == null && srcSlotRef.desc != null && srcSlotRef.getDesc().getColumn() != null) { + srcColumnName = srcSlotRef.desc.getColumn().getName(); + } + String thisColumnName = getColumnName(); + if (thisColumnName == null && desc != null && desc.getColumn() != null) { + thisColumnName = desc.getColumn().getName(); + } + if ((thisColumnName == null) != (srcColumnName == null)) { + return false; + } + if (thisColumnName != null && !thisColumnName.toLowerCase().equals(srcColumnName.toLowerCase())) { + return false; + } + return true; + } + @Override public void vectorizedAnalyze(Analyzer analyzer) { computeOutputColumn(analyzer); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java index 1a293abdbb054a..ec474562a7a506 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java @@ -24,12 +24,14 @@ import org.apache.doris.thrift.TDataPartition; import org.apache.doris.thrift.TExplainLevel; import org.apache.doris.thrift.TPartitionType; + import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; -import org.apache.logging.log4j.Logger; + import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.util.List; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java index 9aeaba9b2f7214..8f61a499a7bacf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java @@ -275,10 +275,15 @@ private PlanFragment createMergeFragment(PlanFragment inputFragment) private PlanFragment createScanFragment(PlanNode node) { if (node instanceof MysqlScanNode || node instanceof OdbcScanNode) { return new PlanFragment(ctx_.getNextFragmentId(), node, DataPartition.UNPARTITIONED); - } else if (node instanceof SchemaScanNode) { + } else if (node instanceof SchemaScanNode) { return new PlanFragment(ctx_.getNextFragmentId(), node, DataPartition.UNPARTITIONED); + } else if (node instanceof OlapScanNode) { + // olap scan node + OlapScanNode olapScanNode = (OlapScanNode) node; + return new PlanFragment(ctx_.getNextFragmentId(), node, + olapScanNode.constructInputPartitionByDistributionInfo(), DataPartition.RANDOM); } else { - // es scan node, olap scan node are random partitioned + // other scan nodes are random partitioned: es, broker return new PlanFragment(ctx_.getNextFragmentId(), node, DataPartition.RANDOM); } } @@ -542,8 +547,10 @@ private OlapScanNode genSrcScanNode(Expr expr, PlanFragment planFragment, List, - List> scanNodeWithJoinConjuncts, List cannotReason) { + private boolean dataDistributionMatchEqPredicate(Map, List> scanNodeWithJoinConjuncts, + List cannotReason) { + // If left table and right table is same table and they select same single partition or no partition + // they are naturally colocate relationship no need to check colocate group for (Map.Entry, List> entry : scanNodeWithJoinConjuncts.entrySet()) { OlapScanNode leftScanNode = entry.getKey().first; OlapScanNode rightScanNode = entry.getKey().second; @@ -842,8 +849,10 @@ private PlanFragment createSetOperationNodeFragment( } // There is at least one partitioned child fragment. + // TODO(ML): here PlanFragment setOperationFragment = new PlanFragment(ctx_.getNextFragmentId(), setOperationNode, - DataPartition.RANDOM); + new DataPartition(TPartitionType.HASH_PARTITIONED, + setOperationNode.getMaterializedResultExprLists_().get(0))); for (int i = 0; i < childFragments.size(); ++i) { PlanFragment childFragment = childFragments.get(i); /* if (childFragment.isPartitioned() && childFragment.getPlanRoot().getNumInstances() > 1) { @@ -962,10 +971,7 @@ private PlanFragment createAggregationFragment( if (isDistinct) { return createPhase2DistinctAggregationFragment(node, childFragment, fragments); } else { - // Check table's distribution. See #4481. - PlanNode childPlan = childFragment.getPlanRoot(); - if (childPlan instanceof OlapScanNode && - ((OlapScanNode) childPlan).getOlapTable().meetAggDistributionRequirements(node.getAggInfo())) { + if (canColocateAgg(node.getAggInfo(), childFragment.getInputDataPartition())) { childFragment.addPlanRoot(node); return childFragment; } else { @@ -974,6 +980,64 @@ private PlanFragment createAggregationFragment( } } + /** + * Colocate Agg can be performed when the following 2 conditions are met at the same time. + * 1. Session variables disable_colocate_plan = false + * 2. The input data partition of child fragment < agg node partition exprs + */ + private boolean canColocateAgg(AggregateInfo aggregateInfo, List childFragmentDataPartition) { + // Condition1 + if (ConnectContext.get().getSessionVariable().isDisableColocatePlan()) { + LOG.debug("Agg node is not colocate in:" + ConnectContext.get().getQueryDetail().getQueryId() + + ", reason:" + DistributedPlanColocateRule.SESSION_DISABLED); + return false; + } + + // Condition2 + List aggPartitionExprs = aggregateInfo.getInputPartitionExprs(); + for (DataPartition childDataPartition : childFragmentDataPartition) { + if (dataPartitionMatchAggInfo(childDataPartition, aggPartitionExprs)) { + return true; + } + } + return false; + } + + /** + * The aggPartitionExprs should contains all of data partition columns. + * Since aggPartitionExprs may be derived from the transformation of the lower tuple, + * it is necessary to find the source expr of itself firstly. + *

+ * For example: + * Data Partition: t1.k1, t1.k2 + * Agg Partition Exprs: t1.k1, t1.k2, t1.k3 + * Return: true + *

+ * Data Partition: t1.k1, t1.k2 + * Agg Partition Exprs: t1.k1, t2.k2 + * Return: false + */ + private boolean dataPartitionMatchAggInfo(DataPartition dataPartition, List aggPartitionExprs) { + TPartitionType partitionType = dataPartition.getType(); + if (partitionType != TPartitionType.HASH_PARTITIONED) { + return false; + } + List dataPartitionExprs = dataPartition.getPartitionExprs(); + for (Expr dataPartitionExpr : dataPartitionExprs) { + boolean match = false; + for (Expr aggPartitionExpr : aggPartitionExprs) { + if (aggPartitionExpr.comeFrom(dataPartitionExpr)) { + match = true; + break; + } + } + if (!match) { + return false; + } + } + return true; + } + private PlanFragment createRepeatNodeFragment( RepeatNode repeatNode, PlanFragment childFragment, ArrayList fragments) throws UserException { @@ -1204,6 +1268,7 @@ private PlanFragment createAnalyticFragment( // required if the sort partition exprs reference a tuple that is made nullable in // 'childFragment' to bring NULLs from outer-join non-matches together. DataPartition sortPartition = sortNode.getInputPartition(); + // TODO(ML): here if (!childFragment.getDataPartition().equals(sortPartition)) { // TODO(zc) || childFragment.refsNullableTupleId(sortPartition.getPartitionExprs())) { analyticFragment = createParentFragment(childFragment, sortNode.getInputPartition()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index 4d260ae6a238e2..6b1980f4abf57c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -775,7 +775,7 @@ private boolean isSlotRefNested(Expr expr) { return expr instanceof SlotRef; } - private void filterDeletedRows(Analyzer analyzer) throws AnalysisException{ + private void filterDeletedRows(Analyzer analyzer) throws AnalysisException { if (!Util.showHiddenColumns() && olapTable.hasDeleteSign()) { SlotRef deleteSignSlot = new SlotRef(desc.getAliasAsName(), Column.DELETE_SIGN); deleteSignSlot.analyze(analyzer); @@ -785,4 +785,16 @@ private void filterDeletedRows(Analyzer analyzer) throws AnalysisException{ conjuncts.add(conjunct); } } + + public DataPartition constructInputPartitionByDistributionInfo() { + DistributionInfo distributionInfo = olapTable.getDefaultDistributionInfo(); + Preconditions.checkState(distributionInfo instanceof HashDistributionInfo); + List distributeColumns = ((HashDistributionInfo) distributionInfo).getDistributionColumns(); + List dataDistributeExprs = Lists.newArrayList(); + for (Column column : distributeColumns) { + SlotRef slotRef = new SlotRef(desc.getRef().getName(), column.getName()); + dataDistributeExprs.add(slotRef); + } + return DataPartition.hashPartitioned(dataDistributeExprs); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java index 27ada66860a9e3..234e7d3bd8fe2d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java @@ -29,6 +29,7 @@ import org.apache.doris.thrift.TResultSinkType; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; import org.apache.commons.collections.CollectionUtils; import org.apache.logging.log4j.LogManager; @@ -97,6 +98,17 @@ public class PlanFragment extends TreeNode { // TODO: improve this comment, "input" is a bit misleading private final DataPartition dataPartition; + // specification of the actually input partition of this fragment when transmitting to be. + // By default, the value of the data partition in planner and the data partition transmitted to be are the same. + // So this attribute is empty. + // But sometimes the planned value and the serialized value are inconsistent. You need to set this value. + // At present, this situation only occurs in the fragment where the scan node is located. + // Since the data partition expression of the scan node is actually constructed from the schema of the table, + // the expression is not analyzed. + // This will cause this expression to not be serialized correctly and transmitted to be. + // In this case, you need to set this attribute to DataPartition RANDOM to avoid the problem. + private DataPartition dataPartitionForThrift; + // specification of how the output of this fragment is partitioned (i.e., how // it's sent to its destination); // if the output is UNPARTITIONED, it is being broadcast @@ -128,6 +140,11 @@ public PlanFragment(PlanFragmentId id, PlanNode root, DataPartition partition) { setFragmentInPlanTree(planRoot); } + public PlanFragment(PlanFragmentId id, PlanNode root, DataPartition partition, DataPartition partitionForThrift) { + this(id, root, partition); + this.dataPartitionForThrift = partitionForThrift; + } + /** * Assigns 'this' as fragment of all PlanNodes in the plan tree rooted at node. * Does not traverse the children of ExchangeNodes because those must belong to a @@ -212,7 +229,11 @@ public TPlanFragment toThrift() { if (sink != null) { result.setOutputSink(sink.toThrift()); } - result.setPartition(dataPartition.toThrift()); + if (dataPartitionForThrift == null) { + result.setPartition(dataPartition.toThrift()); + } else { + result.setPartition(dataPartitionForThrift.toThrift()); + } // TODO chenhao , calculated by cost result.setMinReservationBytes(0); @@ -260,6 +281,15 @@ public void setDestination(ExchangeNode destNode) { dest.addChild(this); } + public List getInputDataPartition() { + List result = Lists.newArrayList(); + result.add(getDataPartition()); + for (PlanFragment child : children) { + result.add(child.getOutputPartition()); + } + return result; + } + public DataPartition getDataPartition() { return dataPartition; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java index b0c33da2a158af..dd7f6363984b5e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java @@ -645,7 +645,7 @@ public String toString() { sb.append("\n").append(getNodeExplainString("", TExplainLevel.BRIEF)); return sb.toString(); } - + public ScanNode getScanNodeInOneFragmentByTupleId(TupleId tupleId) { if (this instanceof ScanNode && tupleIds.contains(tupleId)) { return (ScanNode) this; diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/ColocatePlanTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/ColocatePlanTest.java index 8ff49e32364635..5c28b224f3d042 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/ColocatePlanTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/ColocatePlanTest.java @@ -56,7 +56,6 @@ public static void setUp() throws Exception { Catalog.getCurrentCatalog().createTable(createTableStmt); } - @AfterClass public static void tearDown() { File file = new File(runningDir); diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/OlapScanNodeTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/OlapScanNodeTest.java index f9f0f9362330fd..ebe7da33c75109 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/OlapScanNodeTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/OlapScanNodeTest.java @@ -17,14 +17,6 @@ package org.apache.doris.planner; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; - -import org.junit.Assert; -import org.junit.Test; - import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.InPredicate; import org.apache.doris.analysis.IntLiteral; @@ -34,9 +26,18 @@ import org.apache.doris.catalog.PartitionKey; import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.common.AnalysisException; + import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import org.junit.Assert; +import org.junit.Test; + public class OlapScanNodeTest { // columnA in (1) hashmode=3 @Test @@ -157,6 +158,21 @@ public void testHashForIntLiteral() { long hashValue = hashKey.getHashValue(); long mod = (int) ((hashValue & 0xffffffff) % 3); Assert.assertEquals(mod, 2); - } + } } + +// @Test +// public void testConstructInputPartitionByDistributionInfo(@Injectable OlapTable olapTable, +// @Injectable TupleDescriptor tupleDescriptor) { +// PlanNodeId planNodeId = new PlanNodeId(1); +// OlapScanNode olapScanNode = new OlapScanNode(planNodeId, tupleDescriptor, "scan node"); +// Deencapsulation.setField(olapScanNode, "olapTable", olapTable); +// new Expectations() { +// { +// olapTable.getDefaultDistributionInfo(); +// result = +// } +// }; +// olapScanNode.constructInputPartitionByDistributionInfo(); +// } }