Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1674,9 +1674,8 @@ public PlanFragment visitPhysicalSetOperation(
throw new RuntimeException("not support set operation type " + setOperation);
}

setOperation.children().stream()
.map(Plan::getOutput)
.map(l -> l.stream()
setOperation.getRegularChildrenOutputs().stream()
.map(o -> o.stream()
.map(e -> ExpressionTranslator.translate(e, context))
.collect(ImmutableList.toImmutableList()))
.forEach(setOperationNode::addResultExprLists);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,9 +373,9 @@ public PhysicalProperties visitPhysicalSetOperation(PhysicalSetOperation setOper
}
DistributionSpecHash distributionSpecHash = (DistributionSpecHash) childDistribution;
int[] offsetsOfCurrentChild = new int[distributionSpecHash.getOrderedShuffledColumns().size()];
for (int j = 0; j < setOperation.getChildOutput(i).size(); j++) {
for (int j = 0; j < setOperation.getRegularChildOutput(i).size(); j++) {
int offset = distributionSpecHash.getExprIdToEquivalenceSet()
.getOrDefault(setOperation.getChildOutput(i).get(j).getExprId(), -1);
.getOrDefault(setOperation.getRegularChildOutput(i).get(j).getExprId(), -1);
if (offset >= 0) {
offsetsOfCurrentChild[offset] = j;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@
import org.apache.doris.nereids.util.JoinUtils;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -324,11 +324,11 @@ public Boolean visitPhysicalProject(PhysicalProject<? extends Plan> project, Voi
public Boolean visitPhysicalSetOperation(PhysicalSetOperation setOperation, Void context) {
// process must shuffle
visit(setOperation, context);
// process set operation
// union with only constant exprs list
if (children.isEmpty()) {
return true;
}

// process set operation
PhysicalProperties requiredProperty = requiredProperties.get(0);
DistributionSpec requiredDistributionSpec = requiredProperty.getDistributionSpec();
if (requiredDistributionSpec instanceof DistributionSpecGather) {
Expand Down Expand Up @@ -379,16 +379,38 @@ public Boolean visitAbstractPhysicalSort(AbstractPhysicalSort<? extends Plan> so
return true;
}

/**
* check both side real output hash key order are same or not.
*
* @param notShuffleSideOutput not shuffle side real output used hash spec
* @param shuffleSideOutput shuffle side real output used hash spec
* @param notShuffleSideRequired not shuffle side required used hash spec
* @param shuffleSideRequired shuffle side required hash spec
* @return true if same
*/
private boolean bothSideShuffleKeysAreSameOrder(
DistributionSpecHash notShuffleSideOutput, DistributionSpecHash shuffleSideOutput,
DistributionSpecHash notShuffleSideRequired, DistributionSpecHash shuffleSideRequired) {
return shuffleSideOutput.getOrderedShuffledColumns().equals(
calAnotherSideRequiredShuffleIds(notShuffleSideOutput, notShuffleSideRequired, shuffleSideRequired));
}

/**
* calculate the shuffle side hash key right orders.
* For example,
* if not shuffle side real hash key is 1 2 3.
* the requirement of hash key of not shuffle side is 3 2 1.
* the requirement of hash key of shuffle side is 6 5 4.
* then we should let the shuffle side real output hash key order as 4 5 6
*
* @param notShuffleSideOutput not shuffle side real output used hash spec
* @param notShuffleSideRequired not shuffle side required used hash spec
* @param shuffleSideRequired shuffle side required hash spec
* @return shuffle side real output used hash key order
*/
private List<ExprId> calAnotherSideRequiredShuffleIds(DistributionSpecHash notShuffleSideOutput,
DistributionSpecHash notShuffleSideRequired, DistributionSpecHash shuffleSideRequired) {
List<ExprId> rightShuffleIds = new ArrayList<>();
ImmutableList.Builder<ExprId> rightShuffleIds = ImmutableList.builder();
for (ExprId scanId : notShuffleSideOutput.getOrderedShuffledColumns()) {
int index = notShuffleSideRequired.getOrderedShuffledColumns().indexOf(scanId);
if (index == -1) {
Expand All @@ -401,12 +423,23 @@ private List<ExprId> calAnotherSideRequiredShuffleIds(DistributionSpecHash notSh
}
}
}
Preconditions.checkArgument(index != -1);
Preconditions.checkState(index != -1, "index could not be -1");
rightShuffleIds.add(shuffleSideRequired.getOrderedShuffledColumns().get(index));
}
return rightShuffleIds;
return rightShuffleIds.build();
}

/**
* generate shuffle side real output should follow PhysicalProperties. More info could see
* calAnotherSideRequiredShuffleIds's comment.
*
* @param shuffleType real output shuffle type
* @param notShuffleSideOutput not shuffle side real output used hash spec
* @param shuffleSideOutput shuffle side real output used hash spec
* @param notShuffleSideRequired not shuffle side required used hash spec
* @param shuffleSideRequired shuffle side required hash spec
* @return shuffle side new required hash spec
*/
private PhysicalProperties calAnotherSideRequired(ShuffleType shuffleType,
DistributionSpecHash notShuffleSideOutput, DistributionSpecHash shuffleSideOutput,
DistributionSpecHash notShuffleSideRequired, DistributionSpecHash shuffleSideRequired) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,16 @@
import org.apache.doris.nereids.trees.expressions.ExprId;
import org.apache.doris.nereids.util.Utils;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;

import java.util.BitSet;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -60,21 +61,6 @@ public DistributionSpecHash(List<ExprId> orderedShuffledColumns, ShuffleType shu
this(orderedShuffledColumns, shuffleType, -1L, Collections.emptySet());
}

/**
* Use for merge two shuffle columns.
*/
public DistributionSpecHash(List<ExprId> leftColumns, List<ExprId> rightColumns, ShuffleType shuffleType) {
this(leftColumns, shuffleType, -1L, Collections.emptySet());
Objects.requireNonNull(rightColumns);
Preconditions.checkArgument(leftColumns.size() == rightColumns.size());
int i = 0;
Iterator<Set<ExprId>> iter = equivalenceExprIds.iterator();
for (ExprId id : rightColumns) {
exprIdToEquivalenceSet.put(id, i++);
iter.next().add(id);
}
}

/**
* Used in ut
*/
Expand All @@ -88,18 +74,24 @@ public DistributionSpecHash(List<ExprId> orderedShuffledColumns, ShuffleType shu
*/
public DistributionSpecHash(List<ExprId> orderedShuffledColumns, ShuffleType shuffleType,
long tableId, long selectedIndexId, Set<Long> partitionIds) {
this.orderedShuffledColumns = Objects.requireNonNull(orderedShuffledColumns);
this.shuffleType = Objects.requireNonNull(shuffleType);
this.partitionIds = Objects.requireNonNull(partitionIds);
this.orderedShuffledColumns = ImmutableList.copyOf(
Objects.requireNonNull(orderedShuffledColumns, "orderedShuffledColumns should not null"));
this.shuffleType = Objects.requireNonNull(shuffleType, "shuffleType should not null");
this.partitionIds = ImmutableSet.copyOf(
Objects.requireNonNull(partitionIds, "partitionIds should not null"));
this.tableId = tableId;
this.selectedIndexId = selectedIndexId;
equivalenceExprIds = Lists.newArrayListWithCapacity(orderedShuffledColumns.size());
exprIdToEquivalenceSet = Maps.newHashMapWithExpectedSize(orderedShuffledColumns.size());
ImmutableList.Builder<Set<ExprId>> equivalenceExprIdsBuilder
= ImmutableList.builderWithExpectedSize(orderedShuffledColumns.size());
ImmutableMap.Builder<ExprId, Integer> exprIdToEquivalenceSetBuilder
= ImmutableMap.builderWithExpectedSize(orderedShuffledColumns.size());
int i = 0;
for (ExprId id : orderedShuffledColumns) {
exprIdToEquivalenceSet.put(id, i++);
equivalenceExprIds.add(Sets.newHashSet(id));
equivalenceExprIdsBuilder.add(Sets.newHashSet(id));
exprIdToEquivalenceSetBuilder.put(id, i++);
}
this.equivalenceExprIds = equivalenceExprIdsBuilder.build();
this.exprIdToEquivalenceSet = exprIdToEquivalenceSetBuilder.buildKeepingLast();
}

/**
Expand All @@ -108,8 +100,8 @@ public DistributionSpecHash(List<ExprId> orderedShuffledColumns, ShuffleType shu
public DistributionSpecHash(List<ExprId> orderedShuffledColumns, ShuffleType shuffleType,
long tableId, Set<Long> partitionIds, List<Set<ExprId>> equivalenceExprIds,
Map<ExprId, Integer> exprIdToEquivalenceSet) {
this(orderedShuffledColumns, shuffleType, tableId, -1L, partitionIds, equivalenceExprIds,
exprIdToEquivalenceSet);
this(orderedShuffledColumns, shuffleType, tableId, -1L, partitionIds,
equivalenceExprIds, exprIdToEquivalenceSet);
}

/**
Expand All @@ -118,31 +110,37 @@ public DistributionSpecHash(List<ExprId> orderedShuffledColumns, ShuffleType shu
public DistributionSpecHash(List<ExprId> orderedShuffledColumns, ShuffleType shuffleType, long tableId,
long selectedIndexId, Set<Long> partitionIds, List<Set<ExprId>> equivalenceExprIds,
Map<ExprId, Integer> exprIdToEquivalenceSet) {
this.orderedShuffledColumns = Objects.requireNonNull(orderedShuffledColumns);
this.shuffleType = Objects.requireNonNull(shuffleType);
this.orderedShuffledColumns = ImmutableList.copyOf(Objects.requireNonNull(orderedShuffledColumns,
"orderedShuffledColumns should not null"));
this.shuffleType = Objects.requireNonNull(shuffleType, "shuffleType should not null");
this.tableId = tableId;
this.selectedIndexId = selectedIndexId;
this.partitionIds = Objects.requireNonNull(partitionIds);
this.equivalenceExprIds = Objects.requireNonNull(equivalenceExprIds);
this.exprIdToEquivalenceSet = Objects.requireNonNull(exprIdToEquivalenceSet);
this.partitionIds = ImmutableSet.copyOf(
Objects.requireNonNull(partitionIds, "partitionIds should not null"));
this.equivalenceExprIds = ImmutableList.copyOf(
Objects.requireNonNull(equivalenceExprIds, "equivalenceExprIds should not null"));
this.exprIdToEquivalenceSet = ImmutableMap.copyOf(
Objects.requireNonNull(exprIdToEquivalenceSet, "exprIdToEquivalenceSet should not null"));
}

static DistributionSpecHash merge(DistributionSpecHash left, DistributionSpecHash right, ShuffleType shuffleType) {
List<ExprId> orderedShuffledColumns = left.getOrderedShuffledColumns();
List<Set<ExprId>> equivalenceExprIds = Lists.newArrayListWithCapacity(orderedShuffledColumns.size());
ImmutableList.Builder<Set<ExprId>> equivalenceExprIds
= ImmutableList.builderWithExpectedSize(orderedShuffledColumns.size());
for (int i = 0; i < orderedShuffledColumns.size(); i++) {
Set<ExprId> equivalenceExprId = Sets.newHashSet();
ImmutableSet.Builder<ExprId> equivalenceExprId = ImmutableSet.builderWithExpectedSize(
left.getEquivalenceExprIds().get(i).size() + right.getEquivalenceExprIds().get(i).size());
equivalenceExprId.addAll(left.getEquivalenceExprIds().get(i));
equivalenceExprId.addAll(right.getEquivalenceExprIds().get(i));
equivalenceExprIds.add(equivalenceExprId);
equivalenceExprIds.add(equivalenceExprId.build());
}
Map<ExprId, Integer> exprIdToEquivalenceSet = Maps.newHashMapWithExpectedSize(
ImmutableMap.Builder<ExprId, Integer> exprIdToEquivalenceSet = ImmutableMap.builderWithExpectedSize(
left.getExprIdToEquivalenceSet().size() + right.getExprIdToEquivalenceSet().size());
exprIdToEquivalenceSet.putAll(left.getExprIdToEquivalenceSet());
exprIdToEquivalenceSet.putAll(right.getExprIdToEquivalenceSet());
return new DistributionSpecHash(orderedShuffledColumns, shuffleType,
left.getTableId(), left.getSelectedIndexId(), left.getPartitionIds(), equivalenceExprIds,
exprIdToEquivalenceSet);
left.getTableId(), left.getSelectedIndexId(), left.getPartitionIds(), equivalenceExprIds.build(),
exprIdToEquivalenceSet.buildKeepingLast());
}

static DistributionSpecHash merge(DistributionSpecHash left, DistributionSpecHash right) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.properties.DistributionSpecHash.ShuffleType;
import org.apache.doris.nereids.trees.expressions.ExprId;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.plans.JoinHint;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.algebra.SetOperation;
import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalSort;
import org.apache.doris.nereids.trees.plans.physical.PhysicalAssertNumRows;
import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEAnchor;
Expand All @@ -42,6 +44,7 @@
import org.apache.doris.nereids.util.JoinUtils;
import org.apache.doris.qe.ConnectContext;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;

import java.util.List;
Expand Down Expand Up @@ -191,12 +194,17 @@ public Void visitPhysicalSetOperation(PhysicalSetOperation setOperation, PlanCon
// intersect and except need do distinct, so we must do distribution on it.
DistributionSpec distributionRequestFromParent = requestPropertyFromParent.getDistributionSpec();
if (distributionRequestFromParent instanceof DistributionSpecHash) {
// shuffle according to parent require
DistributionSpecHash distributionSpecHash = (DistributionSpecHash) distributionRequestFromParent;
addRequestPropertyToChildren(createHashRequestAccordingToParent(
setOperation, distributionSpecHash, context));
} else {
addRequestPropertyToChildren(setOperation.children().stream()
.map(Plan::getOutputExprIds)
// shuffle all column
// TODO: for wide table, may be we should add a upper limit of shuffle columns
addRequestPropertyToChildren(setOperation.getRegularChildrenOutputs().stream()
.map(childOutputs -> childOutputs.stream()
.map(SlotReference::getExprId)
.collect(ImmutableList.toImmutableList()))
.map(l -> PhysicalProperties.createHash(l, ShuffleType.EXECUTION_BUCKETED))
.collect(Collectors.toList()));
}
Expand All @@ -223,7 +231,7 @@ public Void visitPhysicalUnion(PhysicalUnion union, PlanContext context) {
} else {
// current be could not run const expr on appropriate node,
// so if we have constant exprs on union, the output of union always any
// then any request on children is useless.
// then any other request on children is useless.
for (int i = context.arity(); i > 0; --i) {
requiredPropertyList.add(PhysicalProperties.ANY);
}
Expand All @@ -249,11 +257,11 @@ public Void visitPhysicalFileSink(PhysicalFileSink<? extends Plan> fileSink, Pla
}

private List<PhysicalProperties> createHashRequestAccordingToParent(
Plan plan, DistributionSpecHash distributionRequestFromParent, PlanContext context) {
SetOperation setOperation, DistributionSpecHash distributionRequestFromParent, PlanContext context) {
List<PhysicalProperties> requiredPropertyList =
Lists.newArrayListWithCapacity(context.arity());
int[] outputOffsets = new int[distributionRequestFromParent.getOrderedShuffledColumns().size()];
List<Slot> setOperationOutputs = plan.getOutput();
List<NamedExpression> setOperationOutputs = setOperation.getOutputs();
// get the offset of bucketed columns of set operation
for (int i = 0; i < setOperationOutputs.size(); i++) {
int offset = distributionRequestFromParent.getExprIdToEquivalenceSet()
Expand All @@ -264,13 +272,13 @@ private List<PhysicalProperties> createHashRequestAccordingToParent(
}
// use the offset to generate children's request
for (int i = 0; i < context.arity(); i++) {
List<Slot> childOutput = plan.child(i).getOutput();
List<ExprId> childRequest = Lists.newArrayList();
List<SlotReference> childOutput = setOperation.getRegularChildOutput(i);
ImmutableList.Builder<ExprId> childRequest = ImmutableList.builder();
for (int offset : outputOffsets) {
childRequest.add(childOutput.get(offset).getExprId());
}
requiredPropertyList.add(PhysicalProperties.createHash(
childRequest, distributionRequestFromParent.getShuffleType()));
childRequest.build(), distributionRequestFromParent.getShuffleType()));
}
return requiredPropertyList;
}
Expand Down
Loading