Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.doris.nereids.properties;

import org.apache.doris.nereids.PlanContext;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.properties.DistributionSpecHash.ShuffleType;
import org.apache.doris.nereids.trees.expressions.Alias;
Expand Down Expand Up @@ -239,18 +240,32 @@ public PhysicalProperties visitPhysicalHashJoin(
DistributionSpecHash leftHashSpec = (DistributionSpecHash) leftOutputProperty.getDistributionSpec();
DistributionSpecHash rightHashSpec = (DistributionSpecHash) rightOutputProperty.getDistributionSpec();

// colocate join
if (leftHashSpec.getShuffleType() == ShuffleType.NATURAL
&& rightHashSpec.getShuffleType() == ShuffleType.NATURAL) {
if (JoinUtils.couldColocateJoin(leftHashSpec, rightHashSpec)) {
return new PhysicalProperties(DistributionSpecHash.merge(leftHashSpec, rightHashSpec));
}
switch (hashJoin.getJoinType()) {
case INNER_JOIN:
case CROSS_JOIN:
return new PhysicalProperties(DistributionSpecHash.merge(
leftHashSpec, rightHashSpec, leftHashSpec.getShuffleType()));
case LEFT_SEMI_JOIN:
case LEFT_ANTI_JOIN:
case NULL_AWARE_LEFT_ANTI_JOIN:
case LEFT_OUTER_JOIN:
return new PhysicalProperties(leftHashSpec);
case RIGHT_SEMI_JOIN:
case RIGHT_ANTI_JOIN:
case RIGHT_OUTER_JOIN:
if (JoinUtils.couldColocateJoin(leftHashSpec, rightHashSpec)) {
return new PhysicalProperties(rightHashSpec);
} else {
// retain left shuffle type, since coordinator use left most node to schedule fragment
// forbid colocate join, since right table already shuffle
return new PhysicalProperties(rightHashSpec.withShuffleTypeAndForbidColocateJoin(
leftHashSpec.getShuffleType()));
}
case FULL_OUTER_JOIN:
return PhysicalProperties.ANY;
default:
throw new AnalysisException("unknown join type " + hashJoin.getJoinType());
}

// shuffle, if left child is natural mean current join is bucket shuffle join
// and remain natural for colocate join on upper join.
return new PhysicalProperties(DistributionSpecHash.merge(
leftHashSpec, rightHashSpec, leftHashSpec.getShuffleType()));
}

throw new RuntimeException("Could not derive hash join's output properties. join: " + hashJoin);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,21 +43,16 @@
public class DistributionSpecHash extends DistributionSpec {

private final List<ExprId> orderedShuffledColumns;

private final ShuffleType shuffleType;
// use for satisfied judge
private final List<Set<ExprId>> equivalenceExprIds;
private final Map<ExprId, Integer> exprIdToEquivalenceSet;

// below two attributes use for colocate join
// below two attributes use for colocate join, only store one table info is enough
private final long tableId;

private final Set<Long> partitionIds;

private final long selectedIndexId;

// use for satisfied judge
private final List<Set<ExprId>> equivalenceExprIds;

private final Map<ExprId, Integer> exprIdToEquivalenceSet;

/**
* Use for no need set table related attributes.
*/
Expand Down Expand Up @@ -239,6 +234,11 @@ public DistributionSpecHash withShuffleType(ShuffleType shuffleType) {
equivalenceExprIds, exprIdToEquivalenceSet);
}

public DistributionSpecHash withShuffleTypeAndForbidColocateJoin(ShuffleType shuffleType) {
return new DistributionSpecHash(orderedShuffledColumns, shuffleType, -1, -1, partitionIds,
equivalenceExprIds, exprIdToEquivalenceSet);
}

/**
* generate a new DistributionSpec after projection.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ public static boolean couldColocateJoin(DistributionSpecHash leftHashSpec, Distr
|| rightHashSpec.getShuffleType() != ShuffleType.NATURAL) {
return false;
}

final long leftTableId = leftHashSpec.getTableId();
final long rightTableId = rightHashSpec.getTableId();
final Set<Long> leftTablePartitions = leftHashSpec.getPartitionIds();
Expand Down
Loading