Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.functions.agg.MultiDistinction;
import org.apache.doris.nereids.trees.plans.AggMode;
import org.apache.doris.nereids.trees.plans.GroupPlan;
import org.apache.doris.nereids.trees.plans.JoinType;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.SortPhase;
Expand All @@ -40,7 +39,6 @@
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN;
import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSetOperation;
Expand Down Expand Up @@ -209,38 +207,12 @@ public Boolean visitPhysicalFilter(PhysicalFilter<? extends Plan> filter, Void c
return true;
}

private boolean isBucketShuffleDownGrade(Plan oneSidePlan, DistributionSpecHash otherSideSpec) {
// improper to do bucket shuffle join:
// oneSide:
// 1. base table
// 2. single partition after pruning
// 3. tablets' number is small enough (< paraInstanceNum)
// otherSide: ShuffleType.EXECUTION_BUCKETED
private boolean isBucketShuffleDownGrade(DistributionSpecHash srcSideSpec) {
boolean isBucketShuffleDownGrade = ConnectContext.get().getSessionVariable().isEnableBucketShuffleDownGrade();
if (!isBucketShuffleDownGrade) {
return false;
} else if (otherSideSpec.getShuffleType() != ShuffleType.EXECUTION_BUCKETED) {
return false;
} else {
int paraNum = Math.max(1, ConnectContext.get().getSessionVariable().getParallelExecInstanceNum());
if (((GroupPlan) oneSidePlan).getGroup().getPhysicalExpressions().isEmpty()) {
return false;
} else {
Plan plan = ((GroupPlan) oneSidePlan).getGroup().getPhysicalExpressions().get(0).getPlan();
while ((plan instanceof PhysicalProject || plan instanceof PhysicalFilter)
&& !((GroupPlan) plan.child(0)).getGroup().getPhysicalExpressions().isEmpty()) {
plan = ((GroupPlan) plan.child(0)).getGroup().getPhysicalExpressions().get(0).getPlan();
}
if (plan != null && plan instanceof PhysicalOlapScan
&& ((PhysicalOlapScan) plan).getSelectedPartitionIds().size() <= 1
&& ((PhysicalOlapScan) plan).getTable() != null
&& ((PhysicalOlapScan) plan).getTable().getDefaultDistributionInfo() != null
&& ((PhysicalOlapScan) plan).getTable().getDefaultDistributionInfo().getBucketNum() < paraNum) {
return true;
} else {
return false;
}
}
return srcSideSpec.getShuffleType() == ShuffleType.EXECUTION_BUCKETED;
}
}

Expand All @@ -262,9 +234,6 @@ public Boolean visitPhysicalHashJoin(PhysicalHashJoin<? extends Plan, ? extends
DistributionSpec leftDistributionSpec = childrenProperties.get(0).getDistributionSpec();
DistributionSpec rightDistributionSpec = childrenProperties.get(1).getDistributionSpec();

Plan leftChild = hashJoin.child(0);
Plan rightChild = hashJoin.child(1);

// broadcast do not need regular
if (rightDistributionSpec instanceof DistributionSpecReplicated) {
return true;
Expand Down Expand Up @@ -296,7 +265,7 @@ public Boolean visitPhysicalHashJoin(PhysicalHashJoin<? extends Plan, ? extends
ShuffleType.EXECUTION_BUCKETED, leftHashSpec, rightHashSpec,
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
(DistributionSpecHash) requiredProperties.get(1).getDistributionSpec()));
} else if (isBucketShuffleDownGrade(leftChild, rightHashSpec)) {
} else if (isBucketShuffleDownGrade(rightHashSpec)) {
updatedForLeft = Optional.of(calAnotherSideRequired(
ShuffleType.EXECUTION_BUCKETED, leftHashSpec, leftHashSpec,
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
Expand All @@ -305,7 +274,7 @@ public Boolean visitPhysicalHashJoin(PhysicalHashJoin<? extends Plan, ? extends
ShuffleType.EXECUTION_BUCKETED, leftHashSpec, rightHashSpec,
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
(DistributionSpecHash) requiredProperties.get(1).getDistributionSpec()));
} else if (isBucketShuffleDownGrade(rightChild, leftHashSpec)) {
} else if (isBucketShuffleDownGrade(leftHashSpec)) {
updatedForLeft = Optional.of(calAnotherSideRequired(
ShuffleType.EXECUTION_BUCKETED, rightHashSpec, leftHashSpec,
(DistributionSpecHash) requiredProperties.get(1).getDistributionSpec(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !ds_shape_13 --
PhysicalResultSink
--hashAgg[GLOBAL]
----PhysicalDistribute[DistributionSpecGather]
------hashAgg[LOCAL]
--------PhysicalProject
----------hashJoin[INNER_JOIN] hashCondition=((store.s_store_sk = store_sales.ss_store_sk)) otherCondition=() build RFs:RF4 ss_store_sk->[s_store_sk]
------------PhysicalDistribute[DistributionSpecHash]
--------------PhysicalProject
----------------PhysicalOlapScan[store] apply RFs: RF4
------------PhysicalDistribute[DistributionSpecHash]
--------------PhysicalProject
----------------hashJoin[INNER_JOIN] hashCondition=((customer_demographics.cd_demo_sk = store_sales.ss_cdemo_sk)) otherCondition=((((household_demographics.hd_dep_count = 1) AND ((((customer_demographics.cd_marital_status = 'D') AND (customer_demographics.cd_education_status = 'Primary')) AND ((store_sales.ss_sales_price >= 50.00) AND (store_sales.ss_sales_price <= 100.00))) OR (((customer_demographics.cd_marital_status = 'W') AND (customer_demographics.cd_education_status = '2 yr Degree')) AND ((store_sales.ss_sales_price >= 150.00) AND (store_sales.ss_sales_price <= 200.00))))) OR ((((customer_demographics.cd_marital_status = 'M') AND (customer_demographics.cd_education_status = 'College')) AND ((store_sales.ss_sales_price >= 100.00) AND (store_sales.ss_sales_price <= 150.00))) AND (household_demographics.hd_dep_count = 3)))) build RFs:RF3 ss_cdemo_sk->[cd_demo_sk]
------------------PhysicalDistribute[DistributionSpecHash]
--------------------PhysicalProject
----------------------filter(((((customer_demographics.cd_marital_status = 'M') AND (customer_demographics.cd_education_status = 'College')) OR ((customer_demographics.cd_marital_status = 'D') AND (customer_demographics.cd_education_status = 'Primary'))) OR ((customer_demographics.cd_marital_status = 'W') AND (customer_demographics.cd_education_status = '2 yr Degree'))))
------------------------PhysicalOlapScan[customer_demographics] apply RFs: RF3
------------------PhysicalDistribute[DistributionSpecHash]
--------------------PhysicalProject
----------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_hdemo_sk = household_demographics.hd_demo_sk)) otherCondition=() build RFs:RF2 hd_demo_sk->[ss_hdemo_sk]
------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_sold_date_sk = date_dim.d_date_sk)) otherCondition=() build RFs:RF1 d_date_sk->[ss_sold_date_sk]
--------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_addr_sk = customer_address.ca_address_sk)) otherCondition=((((ca_state IN ('IL', 'TN', 'TX') AND ((store_sales.ss_net_profit >= 100.00) AND (store_sales.ss_net_profit <= 200.00))) OR (ca_state IN ('ID', 'OH', 'WY') AND ((store_sales.ss_net_profit >= 150.00) AND (store_sales.ss_net_profit <= 300.00)))) OR (ca_state IN ('IA', 'MS', 'SC') AND ((store_sales.ss_net_profit >= 50.00) AND (store_sales.ss_net_profit <= 250.00))))) build RFs:RF0 ca_address_sk->[ss_addr_sk]
----------------------------PhysicalDistribute[DistributionSpecHash]
------------------------------PhysicalProject
--------------------------------filter((store_sales.ss_net_profit <= 300.00) and (store_sales.ss_net_profit >= 50.00) and (store_sales.ss_sales_price <= 200.00) and (store_sales.ss_sales_price >= 50.00))
----------------------------------PhysicalOlapScan[store_sales] apply RFs: RF0 RF1 RF2
----------------------------PhysicalDistribute[DistributionSpecHash]
------------------------------PhysicalProject
--------------------------------filter((customer_address.ca_country = 'United States') and ca_state IN ('IA', 'ID', 'IL', 'MS', 'OH', 'SC', 'TN', 'TX', 'WY'))
----------------------------------PhysicalOlapScan[customer_address]
--------------------------PhysicalDistribute[DistributionSpecReplicated]
----------------------------PhysicalProject
------------------------------filter((date_dim.d_year = 2001))
--------------------------------PhysicalOlapScan[date_dim]
------------------------PhysicalDistribute[DistributionSpecReplicated]
--------------------------PhysicalProject
----------------------------filter(hd_dep_count IN (1, 3))
------------------------------PhysicalOlapScan[household_demographics]

Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !ds_shape_19 --
PhysicalResultSink
--PhysicalTopN[MERGE_SORT]
----PhysicalDistribute[DistributionSpecGather]
------PhysicalTopN[LOCAL_SORT]
--------PhysicalProject
----------hashAgg[GLOBAL]
------------PhysicalDistribute[DistributionSpecHash]
--------------hashAgg[LOCAL]
----------------PhysicalProject
------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_store_sk = store.s_store_sk)) otherCondition=(( not (substring(ca_zip, 1, 5) = substring(s_zip, 1, 5)))) build RFs:RF4 s_store_sk->[ss_store_sk]
--------------------PhysicalProject
----------------------hashJoin[INNER_JOIN] hashCondition=((customer.c_current_addr_sk = customer_address.ca_address_sk)) otherCondition=() build RFs:RF3 c_current_addr_sk->[ca_address_sk]
------------------------PhysicalDistribute[DistributionSpecHash]
--------------------------PhysicalProject
----------------------------PhysicalOlapScan[customer_address] apply RFs: RF3
------------------------PhysicalDistribute[DistributionSpecHash]
--------------------------PhysicalProject
----------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_customer_sk = customer.c_customer_sk)) otherCondition=() build RFs:RF2 ss_customer_sk->[c_customer_sk]
------------------------------PhysicalDistribute[DistributionSpecHash]
--------------------------------PhysicalProject
----------------------------------PhysicalOlapScan[customer] apply RFs: RF2
------------------------------PhysicalDistribute[DistributionSpecHash]
--------------------------------PhysicalProject
----------------------------------hashJoin[INNER_JOIN] hashCondition=((date_dim.d_date_sk = store_sales.ss_sold_date_sk)) otherCondition=() build RFs:RF1 d_date_sk->[ss_sold_date_sk]
------------------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_item_sk = item.i_item_sk)) otherCondition=() build RFs:RF0 i_item_sk->[ss_item_sk]
--------------------------------------PhysicalProject
----------------------------------------PhysicalOlapScan[store_sales] apply RFs: RF0 RF1 RF4
--------------------------------------PhysicalDistribute[DistributionSpecReplicated]
----------------------------------------PhysicalProject
------------------------------------------filter((item.i_manager_id = 14))
--------------------------------------------PhysicalOlapScan[item]
------------------------------------PhysicalDistribute[DistributionSpecReplicated]
--------------------------------------PhysicalProject
----------------------------------------filter((date_dim.d_moy = 11) and (date_dim.d_year = 2002))
------------------------------------------PhysicalOlapScan[date_dim]
--------------------PhysicalDistribute[DistributionSpecReplicated]
----------------------PhysicalProject
------------------------PhysicalOlapScan[store]

Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !ds_shape_44 --
PhysicalResultSink
--PhysicalTopN[MERGE_SORT]
----PhysicalDistribute[DistributionSpecGather]
------PhysicalTopN[LOCAL_SORT]
--------PhysicalProject
----------hashJoin[INNER_JOIN] hashCondition=((asceding.rnk = descending.rnk)) otherCondition=()
------------PhysicalDistribute[DistributionSpecHash]
--------------PhysicalProject
----------------hashJoin[INNER_JOIN] hashCondition=((i1.i_item_sk = asceding.item_sk)) otherCondition=() build RFs:RF1 item_sk->[i_item_sk]
------------------PhysicalProject
--------------------PhysicalOlapScan[item] apply RFs: RF1
------------------PhysicalDistribute[DistributionSpecReplicated]
--------------------PhysicalProject
----------------------filter((rnk < 11))
------------------------PhysicalWindow
--------------------------PhysicalQuickSort[MERGE_SORT]
----------------------------PhysicalDistribute[DistributionSpecGather]
------------------------------PhysicalQuickSort[LOCAL_SORT]
--------------------------------PhysicalPartitionTopN
----------------------------------PhysicalProject
------------------------------------NestedLoopJoin[INNER_JOIN](cast(rank_col as DOUBLE) > cast((0.9 * rank_col) as DOUBLE))
--------------------------------------PhysicalProject
----------------------------------------hashAgg[GLOBAL]
------------------------------------------PhysicalDistribute[DistributionSpecHash]
--------------------------------------------hashAgg[LOCAL]
----------------------------------------------PhysicalProject
------------------------------------------------filter((ss1.ss_store_sk = 4))
--------------------------------------------------PhysicalOlapScan[store_sales]
--------------------------------------PhysicalDistribute[DistributionSpecReplicated]
----------------------------------------PhysicalProject
------------------------------------------PhysicalAssertNumRows
--------------------------------------------PhysicalDistribute[DistributionSpecGather]
----------------------------------------------PhysicalProject
------------------------------------------------hashAgg[GLOBAL]
--------------------------------------------------PhysicalDistribute[DistributionSpecHash]
----------------------------------------------------hashAgg[LOCAL]
------------------------------------------------------PhysicalProject
--------------------------------------------------------filter((store_sales.ss_store_sk = 4) and ss_hdemo_sk IS NULL)
----------------------------------------------------------PhysicalOlapScan[store_sales]
------------PhysicalDistribute[DistributionSpecHash]
--------------PhysicalProject
----------------hashJoin[INNER_JOIN] hashCondition=((i2.i_item_sk = descending.item_sk)) otherCondition=() build RFs:RF0 item_sk->[i_item_sk]
------------------PhysicalProject
--------------------PhysicalOlapScan[item] apply RFs: RF0
------------------PhysicalDistribute[DistributionSpecReplicated]
--------------------PhysicalProject
----------------------filter((rnk < 11))
------------------------PhysicalWindow
--------------------------PhysicalQuickSort[MERGE_SORT]
----------------------------PhysicalDistribute[DistributionSpecGather]
------------------------------PhysicalQuickSort[LOCAL_SORT]
--------------------------------PhysicalPartitionTopN
----------------------------------PhysicalProject
------------------------------------NestedLoopJoin[INNER_JOIN](cast(rank_col as DOUBLE) > cast((0.9 * rank_col) as DOUBLE))
--------------------------------------PhysicalProject
----------------------------------------hashAgg[GLOBAL]
------------------------------------------PhysicalDistribute[DistributionSpecHash]
--------------------------------------------hashAgg[LOCAL]
----------------------------------------------PhysicalProject
------------------------------------------------filter((ss1.ss_store_sk = 4))
--------------------------------------------------PhysicalOlapScan[store_sales]
--------------------------------------PhysicalDistribute[DistributionSpecReplicated]
----------------------------------------PhysicalProject
------------------------------------------PhysicalAssertNumRows
--------------------------------------------PhysicalDistribute[DistributionSpecGather]
----------------------------------------------PhysicalProject
------------------------------------------------hashAgg[GLOBAL]
--------------------------------------------------PhysicalDistribute[DistributionSpecHash]
----------------------------------------------------hashAgg[LOCAL]
------------------------------------------------------PhysicalProject
--------------------------------------------------------filter((store_sales.ss_store_sk = 4) and ss_hdemo_sk IS NULL)
----------------------------------------------------------PhysicalOlapScan[store_sales]

Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !ds_shape_45 --
PhysicalResultSink
--PhysicalTopN[MERGE_SORT]
----PhysicalDistribute[DistributionSpecGather]
------PhysicalTopN[LOCAL_SORT]
--------hashAgg[GLOBAL]
----------PhysicalDistribute[DistributionSpecHash]
------------hashAgg[LOCAL]
--------------PhysicalProject
----------------filter((substring(ca_zip, 1, 5) IN ('80348', '81792', '83405', '85392', '85460', '85669', '86197', '86475', '88274') OR $c$1))
------------------hashJoin[INNER_JOIN] hashCondition=((customer.c_current_addr_sk = customer_address.ca_address_sk)) otherCondition=() build RFs:RF3 c_current_addr_sk->[ca_address_sk]
--------------------PhysicalProject
----------------------PhysicalOlapScan[customer_address] apply RFs: RF3
--------------------PhysicalDistribute[DistributionSpecReplicated]
----------------------PhysicalProject
------------------------hashJoin[INNER_JOIN] hashCondition=((web_sales.ws_bill_customer_sk = customer.c_customer_sk)) otherCondition=() build RFs:RF2 ws_bill_customer_sk->[c_customer_sk]
--------------------------PhysicalProject
----------------------------PhysicalOlapScan[customer] apply RFs: RF2
--------------------------PhysicalDistribute[DistributionSpecReplicated]
----------------------------hashJoin[INNER_JOIN] hashCondition=((web_sales.ws_sold_date_sk = date_dim.d_date_sk)) otherCondition=() build RFs:RF1 d_date_sk->[ws_sold_date_sk]
------------------------------hashJoin[INNER_JOIN] hashCondition=((web_sales.ws_item_sk = item.i_item_sk)) otherCondition=() build RFs:RF0 i_item_sk->[ws_item_sk]
--------------------------------PhysicalProject
----------------------------------PhysicalOlapScan[web_sales] apply RFs: RF0 RF1
--------------------------------PhysicalDistribute[DistributionSpecReplicated]
----------------------------------PhysicalProject
------------------------------------hashJoin[LEFT_SEMI_JOIN] hashCondition=((item.i_item_id = item.i_item_id)) otherCondition=()
--------------------------------------PhysicalProject
----------------------------------------PhysicalOlapScan[item]
--------------------------------------PhysicalDistribute[DistributionSpecReplicated]
----------------------------------------PhysicalProject
------------------------------------------filter(i_item_sk IN (11, 13, 17, 19, 2, 23, 29, 3, 5, 7))
--------------------------------------------PhysicalOlapScan[item]
------------------------------PhysicalDistribute[DistributionSpecReplicated]
--------------------------------PhysicalProject
----------------------------------filter((date_dim.d_qoy = 1) and (date_dim.d_year = 2000))
------------------------------------PhysicalOlapScan[date_dim]

Loading