Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.doris.analysis.InsertStmt;
import org.apache.doris.analysis.JoinOperator;
import org.apache.doris.analysis.QueryStmt;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.ColocateTableIndex;
Expand All @@ -49,6 +48,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import avro.shaded.com.google.common.collect.Maps;

Expand Down Expand Up @@ -329,7 +329,7 @@ private PlanFragment createHashJoinFragment(HashJoinNode node, PlanFragment righ
// bucket shuffle join is better than broadcast and shuffle join
// it can reduce the network cost of join, so doris chose it first
List<Expr> rhsPartitionxprs = Lists.newArrayList();
if (canBucketShuffleJoin(node, leftChildFragment, rightChildFragment, rhsPartitionxprs)) {
if (canBucketShuffleJoin(node, leftChildFragment, rhsPartitionxprs)) {
node.setDistributionMode(HashJoinNode.DistributionMode.BUCKET_SHUFFLE);
DataPartition rhsJoinPartition =
new DataPartition(TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED, rhsPartitionxprs);
Expand Down Expand Up @@ -633,7 +633,7 @@ private boolean dataDistributionMatchEqPredicate(List<BinaryPredicate> eqJoinPre
return false;
}

private boolean canBucketShuffleJoin(HashJoinNode node, PlanFragment leftChildFragment, PlanFragment rightChildFragment,
private boolean canBucketShuffleJoin(HashJoinNode node, PlanFragment leftChildFragment,
List<Expr> rhsHashExprs) {
if (!ConnectContext.get().getSessionVariable().isEnableBucketShuffleJoin()) {
return false;
Expand All @@ -649,14 +649,10 @@ private boolean canBucketShuffleJoin(HashJoinNode node, PlanFragment leftChildFr
return canBucketShuffleJoin(node, leftRoot, rhsHashExprs);
}

// 2.leftRoot be hashjoin node and not shuffle join
// 2.leftRoot be hashjoin node
if (leftRoot instanceof HashJoinNode) {
while (leftRoot instanceof HashJoinNode) {
if (!((HashJoinNode)leftRoot).isShuffleJoin()) {
leftRoot = leftRoot.getChild(0);
} else {
return false;
}
leftRoot = leftRoot.getChild(0);
}
if (leftRoot instanceof OlapScanNode) {
return canBucketShuffleJoin(node, leftRoot, rhsHashExprs);
Expand All @@ -683,9 +679,12 @@ private boolean canBucketShuffleJoin(HashJoinNode node, PlanNode leftRoot,
DistributionInfo leftDistribution = leftScanNode.getOlapTable().getDefaultDistributionInfo();

if (leftDistribution instanceof HashDistributionInfo) {
// use the table_name + '-' + column_name as check condition
List<Column> leftDistributeColumns = ((HashDistributionInfo) leftDistribution).getDistributionColumns();
List<String> leftDistributeColumnNames = leftDistributeColumns.stream().
map(col -> leftTable.getName() + "." + col.getName()).collect(Collectors.toList());

List<Column> leftJoinColumns = new ArrayList<>();
List<String> leftJoinColumnNames = new ArrayList<>();
List<Expr> rightExprs = new ArrayList<>();
List<BinaryPredicate> eqJoinConjuncts = node.getEqJoinConjuncts();

Expand All @@ -696,21 +695,32 @@ private boolean canBucketShuffleJoin(HashJoinNode node, PlanNode leftRoot,
continue;
}

SlotDescriptor leftSlot = lhsJoinExpr.unwrapSlotRef().getDesc();

leftJoinColumns.add(leftSlot.getColumn());
rightExprs.add(rhsJoinExpr);
SlotRef leftSlot = lhsJoinExpr.unwrapSlotRef();
if (leftSlot.getTable() instanceof OlapTable) {
// table name in SlotRef is not the really name. `select * from test as t`
// table name in SlotRef is `t`, but here we need is `test`.
leftJoinColumnNames.add(leftSlot.getTable().getName() + "." + leftSlot.getColumnName());
rightExprs.add(rhsJoinExpr);
}
}

//2 the join columns should contains all left table distribute columns to enable bucket shuffle join
for (Column distributeColumn : leftDistributeColumns) {
int loc = leftJoinColumns.indexOf(distributeColumn);
// TODO: now support bucket shuffle join when distribute column type different with
// right expr type
if (loc == -1 || !rightExprs.get(loc).getType().equals(distributeColumn.getType())) {
return false;
for (int i = 0; i < leftDistributeColumnNames.size(); i++) {
String distributeColumnName = leftDistributeColumnNames.get(i);
boolean findRhsExprs = false;
// check the join column name is same as distribute column name and
// check the rhs join expr type is same as distribute column
for (int j = 0; j < leftJoinColumnNames.size(); j++) {
if (leftJoinColumnNames.get(j).equals(distributeColumnName)) {
if (rightExprs.get(j).getType().equals(leftDistributeColumns.get(i).getType())) {
rhsJoinExprs.add(rightExprs.get(j));
findRhsExprs = true;
break;
}
}
}
rhsJoinExprs.add(rightExprs.get(loc));

if (!findRhsExprs) return false;
}
} else {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1121,6 +1121,21 @@ public void testBucketShuffleJoin() throws Exception {
explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
Assert.assertTrue(explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED: `t1`.`k1`, `t1`.`k1`"));
Assert.assertTrue(explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED: `t3`.`k1`, `t3`.`k2`"));

// support recurse of bucket shuffle because t4 join t2 and join column name is same as t2 distribute column name
queryStr = "explain select * from test.jointest t1 join test.bucket_shuffle1 t2 on t1.k1 = t2.k1 and t1.k1 = t2.k2 join test.colocate1 t3 " +
"on t2.k1 = t3.k1 join test.jointest t4 on t4.k1 = t2.k1 and t4.k1 = t2.k2";
explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
Assert.assertTrue(explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED: `t1`.`k1`, `t1`.`k1`"));
Assert.assertTrue(explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED: `t4`.`k1`, `t4`.`k1`"));

// some column name in join expr t3 join t4 and t1 distribute column name, so should not be bucket shuffle join
queryStr = "explain select * from test.jointest t1 join test.bucket_shuffle1 t2 on t1.k1 = t2.k1 and t1.k1 = t2.k2 join test.colocate1 t3 " +
"on t2.k1 = t3.k1 join test.jointest t4 on t4.k1 = t3.k1 and t4.k2 = t3.k2";
explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
Assert.assertTrue(explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED: `t1`.`k1`, `t1`.`k1`"));
Assert.assertTrue(!explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED: `t4`.`k1`, `t4`.`k1`"));

// disable bucket shuffle join again
Deencapsulation.setField(connectContext.getSessionVariable(), "enableBucketShuffleJoin", false);
}
Expand Down