Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -807,8 +807,16 @@ public String debugString() {
}

@Override
protected String tupleDebugName() { return "agg-tuple"; }
protected String tupleDebugName() {
return "agg-tuple";
}

@Override
public AggregateInfo clone() { return new AggregateInfo(this); }
public AggregateInfo clone() {
return new AggregateInfo(this);
}

public List<Expr> getInputPartitionExprs() {
return partitionExprs_ != null ? partitionExprs_ : groupingExprs_;
}
}
21 changes: 21 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java
Original file line number Diff line number Diff line change
Expand Up @@ -1364,6 +1364,27 @@ public SlotRef getSrcSlotRef() {
return sourceExpr.get(0).getSrcSlotRef();
}

public boolean comeFrom(Expr srcExpr) {
SlotRef unwrapSloRef = this.unwrapSlotRef();
if (unwrapSloRef == null) {
return false;
}
SlotRef unwrapSrcSlotRef = srcExpr.unwrapSlotRef();
if (unwrapSrcSlotRef == null) {
return false;
}
if (unwrapSloRef.columnEqual(unwrapSrcSlotRef)) {
return true;
}
// check source expr
SlotDescriptor slotDescriptor = unwrapSloRef.getDesc();
if (slotDescriptor == null || slotDescriptor.getSourceExprs() == null
|| slotDescriptor.getSourceExprs().size() != 1) {
return false;
}
return slotDescriptor.getSourceExprs().get(0).comeFrom(unwrapSrcSlotRef);
}

/**
* If 'this' is a SlotRef or a Cast that wraps a SlotRef, returns that SlotRef.
* Otherwise returns null.
Expand Down
37 changes: 37 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/analysis/SlotRef.java
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,43 @@ public void setDesc(SlotDescriptor desc) {
this.desc = desc;
}

public boolean columnEqual(Expr srcExpr) {
Preconditions.checkState(srcExpr instanceof SlotRef);
SlotRef srcSlotRef = (SlotRef) srcExpr;
if (desc != null && srcSlotRef.desc != null) {
return desc.getId().equals(srcSlotRef.desc.getId());
}
TableName srcTableName = srcSlotRef.tblName;
if (srcTableName == null && srcSlotRef.desc != null) {
srcTableName = srcSlotRef.getTableName();
}
TableName thisTableName = tblName;
if (thisTableName == null && desc != null) {
thisTableName = getTableName();
}
if ((thisTableName == null) != (srcTableName == null)) {
return false;
}
if (thisTableName != null && !thisTableName.equals(srcTableName)) {
return false;
}
String srcColumnName = srcSlotRef.getColumnName();
if (srcColumnName == null && srcSlotRef.desc != null && srcSlotRef.getDesc().getColumn() != null) {
srcColumnName = srcSlotRef.desc.getColumn().getName();
}
String thisColumnName = getColumnName();
if (thisColumnName == null && desc != null && desc.getColumn() != null) {
thisColumnName = desc.getColumn().getName();
}
if ((thisColumnName == null) != (srcColumnName == null)) {
return false;
}
if (thisColumnName != null && !thisColumnName.toLowerCase().equals(srcColumnName.toLowerCase())) {
return false;
}
return true;
}

@Override
public void vectorizedAnalyze(Analyzer analyzer) {
computeOutputColumn(analyzer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@
import org.apache.doris.thrift.TDataPartition;
import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TPartitionType;

import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.logging.log4j.Logger;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.List;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,10 +275,15 @@ private PlanFragment createMergeFragment(PlanFragment inputFragment)
private PlanFragment createScanFragment(PlanNode node) {
if (node instanceof MysqlScanNode || node instanceof OdbcScanNode) {
return new PlanFragment(ctx_.getNextFragmentId(), node, DataPartition.UNPARTITIONED);
} else if (node instanceof SchemaScanNode) {
} else if (node instanceof SchemaScanNode) {
return new PlanFragment(ctx_.getNextFragmentId(), node, DataPartition.UNPARTITIONED);
} else if (node instanceof OlapScanNode) {
// olap scan node
OlapScanNode olapScanNode = (OlapScanNode) node;
return new PlanFragment(ctx_.getNextFragmentId(), node,
olapScanNode.constructInputPartitionByDistributionInfo(), DataPartition.RANDOM);
} else {
// es scan node, olap scan node are random partitioned
// other scan nodes are random partitioned: es, broker
return new PlanFragment(ctx_.getNextFragmentId(), node, DataPartition.RANDOM);
}
}
Expand Down Expand Up @@ -542,8 +547,10 @@ private OlapScanNode genSrcScanNode(Expr expr, PlanFragment planFragment, List<S
}
}

private boolean dataDistributionMatchEqPredicate(Map<Pair<OlapScanNode, OlapScanNode>,
List<BinaryPredicate>> scanNodeWithJoinConjuncts, List<String> cannotReason) {
private boolean dataDistributionMatchEqPredicate(Map<Pair<OlapScanNode, OlapScanNode>, List<BinaryPredicate>> scanNodeWithJoinConjuncts,
List<String> cannotReason) {
// If left table and right table is same table and they select same single partition or no partition
// they are naturally colocate relationship no need to check colocate group
for (Map.Entry<Pair<OlapScanNode, OlapScanNode>, List<BinaryPredicate>> entry : scanNodeWithJoinConjuncts.entrySet()) {
OlapScanNode leftScanNode = entry.getKey().first;
OlapScanNode rightScanNode = entry.getKey().second;
Expand Down Expand Up @@ -842,8 +849,10 @@ private PlanFragment createSetOperationNodeFragment(
}

// There is at least one partitioned child fragment.
// TODO(ML): here
PlanFragment setOperationFragment = new PlanFragment(ctx_.getNextFragmentId(), setOperationNode,
DataPartition.RANDOM);
new DataPartition(TPartitionType.HASH_PARTITIONED,
setOperationNode.getMaterializedResultExprLists_().get(0)));
for (int i = 0; i < childFragments.size(); ++i) {
PlanFragment childFragment = childFragments.get(i);
/* if (childFragment.isPartitioned() && childFragment.getPlanRoot().getNumInstances() > 1) {
Expand Down Expand Up @@ -962,10 +971,7 @@ private PlanFragment createAggregationFragment(
if (isDistinct) {
return createPhase2DistinctAggregationFragment(node, childFragment, fragments);
} else {
// Check table's distribution. See #4481.
PlanNode childPlan = childFragment.getPlanRoot();
if (childPlan instanceof OlapScanNode &&
((OlapScanNode) childPlan).getOlapTable().meetAggDistributionRequirements(node.getAggInfo())) {
if (canColocateAgg(node.getAggInfo(), childFragment.getInputDataPartition())) {
childFragment.addPlanRoot(node);
return childFragment;
} else {
Expand All @@ -974,6 +980,64 @@ private PlanFragment createAggregationFragment(
}
}

/**
* Colocate Agg can be performed when the following 2 conditions are met at the same time.
* 1. Session variables disable_colocate_plan = false
* 2. The input data partition of child fragment < agg node partition exprs
*/
private boolean canColocateAgg(AggregateInfo aggregateInfo, List<DataPartition> childFragmentDataPartition) {
// Condition1
if (ConnectContext.get().getSessionVariable().isDisableColocatePlan()) {
LOG.debug("Agg node is not colocate in:" + ConnectContext.get().getQueryDetail().getQueryId()
+ ", reason:" + DistributedPlanColocateRule.SESSION_DISABLED);
return false;
}

// Condition2
List<Expr> aggPartitionExprs = aggregateInfo.getInputPartitionExprs();
for (DataPartition childDataPartition : childFragmentDataPartition) {
if (dataPartitionMatchAggInfo(childDataPartition, aggPartitionExprs)) {
return true;
}
}
return false;
}

/**
* The aggPartitionExprs should contains all of data partition columns.
* Since aggPartitionExprs may be derived from the transformation of the lower tuple,
* it is necessary to find the source expr of itself firstly.
* <p>
* For example:
* Data Partition: t1.k1, t1.k2
* Agg Partition Exprs: t1.k1, t1.k2, t1.k3
* Return: true
* <p>
* Data Partition: t1.k1, t1.k2
* Agg Partition Exprs: t1.k1, t2.k2
* Return: false
*/
private boolean dataPartitionMatchAggInfo(DataPartition dataPartition, List<Expr> aggPartitionExprs) {
TPartitionType partitionType = dataPartition.getType();
if (partitionType != TPartitionType.HASH_PARTITIONED) {
return false;
}
List<Expr> dataPartitionExprs = dataPartition.getPartitionExprs();
for (Expr dataPartitionExpr : dataPartitionExprs) {
boolean match = false;
for (Expr aggPartitionExpr : aggPartitionExprs) {
if (aggPartitionExpr.comeFrom(dataPartitionExpr)) {
match = true;
break;
}
}
if (!match) {
return false;
}
}
return true;
}

private PlanFragment createRepeatNodeFragment(
RepeatNode repeatNode, PlanFragment childFragment, ArrayList<PlanFragment> fragments)
throws UserException {
Expand Down Expand Up @@ -1204,6 +1268,7 @@ private PlanFragment createAnalyticFragment(
// required if the sort partition exprs reference a tuple that is made nullable in
// 'childFragment' to bring NULLs from outer-join non-matches together.
DataPartition sortPartition = sortNode.getInputPartition();
// TODO(ML): here
if (!childFragment.getDataPartition().equals(sortPartition)) {
// TODO(zc) || childFragment.refsNullableTupleId(sortPartition.getPartitionExprs())) {
analyticFragment = createParentFragment(childFragment, sortNode.getInputPartition());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -775,7 +775,7 @@ private boolean isSlotRefNested(Expr expr) {
return expr instanceof SlotRef;
}

private void filterDeletedRows(Analyzer analyzer) throws AnalysisException{
private void filterDeletedRows(Analyzer analyzer) throws AnalysisException {
if (!Util.showHiddenColumns() && olapTable.hasDeleteSign()) {
SlotRef deleteSignSlot = new SlotRef(desc.getAliasAsName(), Column.DELETE_SIGN);
deleteSignSlot.analyze(analyzer);
Expand All @@ -785,4 +785,16 @@ private void filterDeletedRows(Analyzer analyzer) throws AnalysisException{
conjuncts.add(conjunct);
}
}

public DataPartition constructInputPartitionByDistributionInfo() {
DistributionInfo distributionInfo = olapTable.getDefaultDistributionInfo();
Preconditions.checkState(distributionInfo instanceof HashDistributionInfo);
List<Column> distributeColumns = ((HashDistributionInfo) distributionInfo).getDistributionColumns();
List<Expr> dataDistributeExprs = Lists.newArrayList();
for (Column column : distributeColumns) {
SlotRef slotRef = new SlotRef(desc.getRef().getName(), column.getName());
dataDistributeExprs.add(slotRef);
}
return DataPartition.hashPartitioned(dataDistributeExprs);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.doris.thrift.TResultSinkType;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;

import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -97,6 +98,17 @@ public class PlanFragment extends TreeNode<PlanFragment> {
// TODO: improve this comment, "input" is a bit misleading
private final DataPartition dataPartition;

// specification of the actually input partition of this fragment when transmitting to be.
// By default, the value of the data partition in planner and the data partition transmitted to be are the same.
// So this attribute is empty.
// But sometimes the planned value and the serialized value are inconsistent. You need to set this value.
// At present, this situation only occurs in the fragment where the scan node is located.
// Since the data partition expression of the scan node is actually constructed from the schema of the table,
// the expression is not analyzed.
// This will cause this expression to not be serialized correctly and transmitted to be.
// In this case, you need to set this attribute to DataPartition RANDOM to avoid the problem.
private DataPartition dataPartitionForThrift;

// specification of how the output of this fragment is partitioned (i.e., how
// it's sent to its destination);
// if the output is UNPARTITIONED, it is being broadcast
Expand Down Expand Up @@ -128,6 +140,11 @@ public PlanFragment(PlanFragmentId id, PlanNode root, DataPartition partition) {
setFragmentInPlanTree(planRoot);
}

public PlanFragment(PlanFragmentId id, PlanNode root, DataPartition partition, DataPartition partitionForThrift) {
this(id, root, partition);
this.dataPartitionForThrift = partitionForThrift;
}

/**
* Assigns 'this' as fragment of all PlanNodes in the plan tree rooted at node.
* Does not traverse the children of ExchangeNodes because those must belong to a
Expand Down Expand Up @@ -212,7 +229,11 @@ public TPlanFragment toThrift() {
if (sink != null) {
result.setOutputSink(sink.toThrift());
}
result.setPartition(dataPartition.toThrift());
if (dataPartitionForThrift == null) {
result.setPartition(dataPartition.toThrift());
} else {
result.setPartition(dataPartitionForThrift.toThrift());
}

// TODO chenhao , calculated by cost
result.setMinReservationBytes(0);
Expand Down Expand Up @@ -260,6 +281,15 @@ public void setDestination(ExchangeNode destNode) {
dest.addChild(this);
}

public List<DataPartition> getInputDataPartition() {
List<DataPartition> result = Lists.newArrayList();
result.add(getDataPartition());
for (PlanFragment child : children) {
result.add(child.getOutputPartition());
}
return result;
}

public DataPartition getDataPartition() {
return dataPartition;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -645,7 +645,7 @@ public String toString() {
sb.append("\n").append(getNodeExplainString("", TExplainLevel.BRIEF));
return sb.toString();
}

public ScanNode getScanNodeInOneFragmentByTupleId(TupleId tupleId) {
if (this instanceof ScanNode && tupleIds.contains(tupleId)) {
return (ScanNode) this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ public static void setUp() throws Exception {
Catalog.getCurrentCatalog().createTable(createTableStmt);
}


@AfterClass
public static void tearDown() {
File file = new File(runningDir);
Expand Down
Loading