-
Notifications
You must be signed in to change notification settings - Fork 116
[WIP] New version of join index rule #124
base: master
Are you sure you want to change the base?
Conversation
src/main/scala/com/microsoft/hyperspace/index/rules/JoinIndexRuleV2.scala
Outdated
Show resolved
Hide resolved
src/main/scala/com/microsoft/hyperspace/index/rules/JoinIndexRuleV2.scala
Outdated
Show resolved
Hide resolved
|
Question: is there any case that JoinRule v2 cannot cover but JoinRule v1 can? Could we replace v1 with v2? |
V1 rule works on both sides of a join simultaneously so it can find better, more compatible indexes on both sides. There's one case where this affects: multi-column join query: V1 will find an index where the columns are compatible. V2 may choose non-matching indexes like this THere's no guarantee that for multi-column indexes, the most compatible index pair is selected |
| */ | ||
| private lazy val ruleBatch: Seq[Rule[LogicalPlan]] = | ||
| sparkSession.conf.getOption(ENABLE_JOIN_RULE_V2) match { | ||
| case Some(v) if v.toBoolean => JoinIndexRuleV2 +: hyperspaceOptimizationRuleBatch |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the explanation! Then it might be good to apply the rules in this order: JoinV1 => JoinV2 => Filter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh that's a fair point, we can discuss on this.
The problem with keeping v1 before v2 is it will optimize all lowest level joins. It might optimize a broadcast join, and replace data nodes with index nodes. This will make the subplan ineligible for optimization by V2 and exploit a more beneficial higher level sort merge join, hurting the overall performance for majority of such cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but we can discuss the pros and cons and make a decision.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good points.
It might optimize a broadcast join, and replace data nodes with index nodes.
If v1 applies indexes only for SMJ and SHJ, would it be OK? I have been thinking about moving our rules to look at SparkPlan similar to this instead of LogicalPlan - something we need to consider moving forward. For now, we can copy the condition for BHJ to detect?
This will make the subplan ineligible for optimization by V2 and exploit a more beneficial higher level sort merge join
Can you elaborate on this a bit? Are you talking about this issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good points.
It might optimize a broadcast join, and replace data nodes with index nodes.
If v1 applies indexes only for SMJ and SHJ, would it be OK? I have been thinking about moving our rules to look at
SparkPlansimilar to this instead ofLogicalPlan- something we need to consider moving forward. For now, we can copy the condition for BHJ to detect?
yeah we can do that probably. A) disable support of BHJ completely, and B) remove the linearity check on both sides of a SMJ node. That should help achieve what you are suggesting.
This will make the subplan ineligible for optimization by V2 and exploit a more beneficial higher level sort merge join
Can you elaborate on this a bit? Are you talking about this issue?
Not exactly. What I mean is if the original plan looks like
SMJ (T1C1 = T2C2)
- T1
- BHJ (T2C3 = T3C3)
- T2
- T3
In this case, it's more beneficial to optimize the SMJ: T1C1 = T2C2 and pick index on T2C2.
But if we pick Join V1 first, we optimize the BHJ first, T2C3 will be picked for partitioning. Now once T2 is replaced with index on T2C3, it cannot be "optimized" by JoinV2 on this side of the plan (using index on T2C2).
JoinV2 will still be able to optimize the T1 side of the plan.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. But if T2C2 is the build side (not the stream side), you will lose the output partitioning without this, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. But if
T2C2is the build side (not the stream side), you will lose the output partitioning without this, no?
oh yes that is correct. Hmm this is interesting.
In that case we will still pick the T2C2 Index and will not be able to optimize both the SMJ (because of lost output partitioning) and BHJ (because we picked the T2C2 index by v2 rule).
Actually now, I am not sure what should we aim for here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@apoorvedave1 @imback82 cc: @rapoth
Can we merge this change in 0.4.0 with the applying order of "Join Rule v1 => Join Rule v2 => Filter Index Rule"? I think we could optimize v1 & v2 in somehow later.
Since Filter Index Rule doesn't use BucketSpec, the current version causes unnecessary shuffle of index data - without Join Rule v2.
I'm certain that Join Rule v2 is at least better than Filter Index Rule if applicable.
What do you think?
# Conflicts: # src/main/scala/com/microsoft/hyperspace/index/rules/JoinIndexRule.scala # src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTests.scala
src/main/scala/com/microsoft/hyperspace/index/rules/JoinIndexRuleV2.scala
Outdated
Show resolved
Hide resolved
src/main/scala/com/microsoft/hyperspace/index/rules/JoinIndexRuleV2.scala
Outdated
Show resolved
Hide resolved
src/main/scala/com/microsoft/hyperspace/index/rules/JoinIndexRuleV2.scala
Outdated
Show resolved
Hide resolved
src/main/scala/com/microsoft/hyperspace/index/rules/JoinIndexRuleV2.scala
Outdated
Show resolved
Hide resolved
src/main/scala/com/microsoft/hyperspace/index/rules/JoinIndexRuleV2.scala
Outdated
Show resolved
Hide resolved
src/main/scala/com/microsoft/hyperspace/index/rules/JoinIndexRuleV2.scala
Outdated
Show resolved
Hide resolved
src/main/scala/com/microsoft/hyperspace/index/rules/JoinIndexRuleV2.scala
Outdated
Show resolved
Hide resolved
src/main/scala/com/microsoft/hyperspace/index/rules/JoinIndexRuleV2.scala
Outdated
Show resolved
Hide resolved
| // compared to filter indexes. | ||
| private val hyperspaceOptimizationRuleBatch = JoinIndexRule :: FilterIndexRule :: Nil | ||
| private val hyperspaceOptimizationRuleBatch = | ||
| JoinIndexRuleV2 :: JoinIndexRule :: FilterIndexRule :: Nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
previous discussion: #124 (comment)
I think this could be
JoinIndexRule :: JoinIndexRuleV2 :: FilterIndexRule
as JoinIndexRule could remove unnecessary shuffle if possible.
And I think we could check BHJ for JoinV1 using isBroadcastJoin()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you create an issue for the v1 v2 rule optimization (ordering or merging) and add todo comment here? Thanks!
src/test/scala/com/microsoft/hyperspace/index/rules/JoinIndexRuleV2Test.scala
Outdated
Show resolved
Hide resolved
…uleV2.scala Co-authored-by: EJ Song <51077614+sezruby@users.noreply.github.com>
src/main/scala/com/microsoft/hyperspace/index/rules/JoinIndexRuleV2.scala
Show resolved
Hide resolved
| // t1 BHJ(t2c2 = t3c2) | ||
| // / \ | ||
| // t2 t3 | ||
| // When V2 is run, this should optimize indexes for t1 and t2 based on join condition in SMJ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if SMJ - SMJ ? What if there is a node with different output partitioning (like Union?) in the middle of left or right? Should we need to check if the bucketing is preserved or no?
SMJ
/ \
t1 Union
/ \
t2 t3
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added test case in E2E test to explain behavior in such scenario. in summary, lower smj will be favoured over higher level smj.
Since I am replacing only the scan node with index (or Union in case of hybrid scan) I think the ensure requirements phase will make sure to introduce bucketing when required. Could you please give an example where this could fail? I couldn't come up with one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant - SMJ-SMJ case, if the lower SMJ isn't eligible? But it requires shuffle w/ different columns?
And for Union, I edited the above comment.
Since v1 has isLinear check function, it's not a problem with v1, but I wonder how it is handled in v2.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant - SMJ-SMJ case, if the lower SMJ isn't eligible? But it requires shuffle w/ different columns?
oh got it. then for lower SMJ, index won't be picked for any relation. If one of these relations is eligible for higher SMJ, it will be replaced by its index. I think this case is similar to a lower level BHJ scenario where build side is the side with supported index columns in higher level SMJ.
cc @imback82
And for Union, I edited the above comment.
Since v1 has isLinear check function, it's not a problem with v1, but I wonder how it is handled in v2.
Union relations will remain untouched if the join columns used in higher level SMJ belong to Union node. If it belongs to only one of the tables t2 or t3, it will be replaced with the index. But if both t2 and t3 have some columns used in SMJ, we won't replace with index at all.
Bucketing info will not travel up the Union node because the other side of union is not bucketed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So in those cases, there should be a shuffle node even with the index, right?
I'm not sure if it's a good idea to apply indexes for them.
And if Hybrid Scan is applied for these ineffective indexes, there should be some performance regression:
- shuffle for BucketUnion (for hybrid scan, which is unnecessary)
- shuffle for join (higher SMJ)
Other than this, the change generally LGTM.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But
shuffle for join (for higherSMJ)ANDshuffle for BucketUnion (for Hybrid Scan)will cause degradation as the index is ineffective. Can we add linearity check at least for now? @imback82 @pirz
I don't think there will be another shuffle for higherSMJ. Only a shuffle within the BucketUnion as per my understanding of hybrid scan. Again, index will not be ineffective for this case.
IF there is a BucketUnion, it means the Join Rule has added it. And it was added ONLY IF the children of bucket union node have same partitioning as higher level SMJ. Is that not correct? And if it is, there will NOT be any more shuffle node except for the one within the BucketUnion node
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought not having the linearity check was helping few queries in TPC-DS where output partitioning from lower node needs to travel up? I wonder what would happen if we change this in terms of the perf gain. I guess we need the TPC-* test framework sooner than later to iterate quickly.
@apoorvedave1 Could you compile a list of concerns in #237 so we can better track (if it's missing any of the worthy discussion)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And it was added ONLY IF the children of bucket union node have same partitioning as higher level SMJ. Is that not correct? And if it is, there will NOT be any more shuffle node except for the one within the BucketUnion node
In case the lower level SMJ doesn't retain the bucketing spec, there will be a shuffle for the higher level SMJ, and then shuffle for BucketUnion (for the higher level SMJ) will be a cause of perf regression. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I really got confused now. Could you create another diagram please? it looks like you have 2 SMJs and a Bucket union.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok for example:
SMJ1
/ \ (shuffle for SMJ1)
t1 SMJ2 (not eligible for index)
/ \
shuffle shuffle
/ \
t2 t3
will be transformed:
SMJ1 (eligible for index, higher level SMJ)
/ \ (shuffle for SMJ1)
t1 SMJ2 (not eligible for index, lower level SMJ, different bucketing columns)
/ \
shuffle for SMJ2 shuffle - t3
/
bucket union for SMJ1
/ \
t2 idx shuffle for SMJ1
\
t2 appended
pirz
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, Thanks @apoorvedave1 :)
Could you update this with a concrete example for others? |
|
Could you fix the build error? Let's merge this PR for the release. Thanks! |
| !isBroadcastJoin(l, r) && isJoinConditionSupported(condition) | ||
| } | ||
|
|
||
| private def isBroadcastJoin(l: LogicalPlan, r: LogicalPlan): Boolean = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems this doesn't work as expected.
Logical Plan (tpcds q91):
+- 'Aggregate ['cc_call_center_id, 'cc_name, 'cc_manager, 'cd_marital_status, 'cd_education_status], ['cc_call_center_id AS Call_Center#4158, 'cc_name AS Call_Center_Name#4159, 'cc_manager AS Manager#4160, 'sum('cr_net_loss) AS Returns_Loss#4161]
+- 'Filter ((((('cr_call_center_sk = 'cc_call_center_sk) && ('cr_returned_date_sk = 'd_date_sk)) && ('cr_returning_customer_sk = 'c_customer_sk)) && ((('cd_demo_sk = 'c_current_cdemo_sk) && ('hd_demo_sk = 'c_current_hdemo_sk)) && ('ca_address_sk = 'c_current_addr_sk))) && (((('d_year = 1998) && ('d_moy = 11)) && ((('cd_marital_status = M) && ('cd_education_status = Unknown)) || (('cd_marital_status = W) && ('cd_education_status = Advanced Degree)))) && ('hd_buy_potential LIKE Unknown% && ('ca_gmt_offset = -7))))
+- 'Join Inner
:- 'Join Inner
: :- 'Join Inner
: : :- 'Join Inner
: : : :- 'Join Inner
: : : : :- 'Join Inner
: : : : : :- 'UnresolvedRelation `call_center`
: : : : : +- 'UnresolvedRelation `catalog_returns`
: : : : +- 'UnresolvedRelation `date_dim`
: : : +- 'UnresolvedRelation `customer`
: : +- 'UnresolvedRelation `customer_address`
: +- 'UnresolvedRelation `customer_demographics`
+- 'UnresolvedRelation `household_demographics`
physical plan using v2:
== Physical Plan ==
*(11) Sort [Returns_Loss#4161 DESC NULLS LAST], true, 0
+- Exchange rangepartitioning(Returns_Loss#4161 DESC NULLS LAST, 200)
+- *(10) HashAggregate(keys=[cc_call_center_id#6, cc_name#11, cc_manager#16, cd_marital_status#235, cd_education_status#236], functions=[sum(cr_net_loss#111)], output=[Call_Center#4158, Call_Center_Name#4159, Manager#4160, Returns_Loss#4161])
+- Exchange hashpartitioning(cc_call_center_id#6, cc_name#11, cc_manager#16, cd_marital_status#235, cd_education_status#236, 200)
+- *(9) HashAggregate(keys=[cc_call_center_id#6, cc_name#11, cc_manager#16, cd_marital_status#235, cd_education_status#236], functions=[partial_sum(cr_net_loss#111)], output=[cc_call_center_id#6, cc_name#11, cc_manager#16, cd_marital_status#235, cd_education_status#236, sum#4174])
+- *(9) Project [cc_call_center_id#6, cc_name#11, cc_manager#16, cr_net_loss#111, cd_marital_status#235, cd_education_status#236]
+- *(9) BroadcastHashJoin [c_current_hdemo_sk#254L], [hd_demo_sk#343L], Inner, BuildRight
:- *(9) Project [cc_call_center_id#6, cc_name#11, cc_manager#16, cr_net_loss#111, c_current_hdemo_sk#254L, cd_marital_status#235, cd_education_status#236]
: +- *(9) BroadcastHashJoin [c_current_cdemo_sk#253L], [cd_demo_sk#233L], Inner, BuildRight
: :- *(9) Project [cc_call_center_id#6, cc_name#11, cc_manager#16, cr_net_loss#111, c_current_cdemo_sk#253L, c_current_hdemo_sk#254L]
: : +- *(9) BroadcastHashJoin [c_current_addr_sk#255L], [ca_address_sk#207L], Inner, BuildRight
: : :- *(9) Project [cc_call_center_id#6, cc_name#11, cc_manager#16, cr_net_loss#111, c_current_cdemo_sk#253L, c_current_hdemo_sk#254L, c_current_addr_sk#255L]
: : : +- *(9) SortMergeJoin [cr_returning_customer_sk#92L], [c_customer_sk#251L], Inner
: : : :- *(4) Sort [cr_returning_customer_sk#92L ASC NULLS FIRST], false, 0
: : : : +- Exchange hashpartitioning(cr_returning_customer_sk#92L, 200)
: : : : +- *(3) Project [cc_call_center_id#6, cc_name#11, cc_manager#16, cr_returning_customer_sk#92L, cr_net_loss#111]
: : : : +- *(3) BroadcastHashJoin [cr_returned_date_sk#85L], [d_date_sk#287L], Inner, BuildRight
: : : : :- *(3) Project [cc_call_center_id#6, cc_name#11, cc_manager#16, cr_returned_date_sk#85L, cr_returning_customer_sk#92L, cr_net_loss#111]
: : : : : +- *(3) BroadcastHashJoin [cc_call_center_sk#5L], [cr_call_center_sk#96L], Inner, BuildLeft
: : : : : :- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
: : : : : : +- *(1) Project [cc_call_center_sk#5L, cc_call_center_id#6, cc_name#11, cc_manager#16]
: : : : : : +- *(1) Filter isnotnull(cc_call_center_sk#5L)
: : : : : : +- *(1) FileScan parquet [cc_call_center_sk#5L,cc_call_center_id#6,cc_name#11,cc_manager#16] Batched: true, Format: Parquet, Location: InMemoryFileIndex[wasb://teststorage@testtest.blob.core.windows.net/data/tpcds-1T..., PartitionFilters: [], PushedFilters: [IsNotNull(cc_call_center_sk)], ReadSchema: struct<cc_call_center_sk:bigint,cc_call_center_id:string,cc_name:string,cc_manager:string> : : : : : +- *(3) Project [cr_returned_date_sk#85L, cr_returning_customer_sk#92L, cr_call_center_sk#96L, cr_net_loss#111]
: : : : : +- *(3) Filter ((isnotnull(cr_call_center_sk#96L) && isnotnull(cr_returned_date_sk#85L)) && isnotnull(cr_returning_customer_sk#92L))
: : : : : +- *(3) FileScan parquet [cr_returned_date_sk#85L,cr_returning_customer_sk#92L,cr_call_center_sk#96L,cr_net_loss#111] Batched: true, Format: Parquet, Location: InMemoryFileIndex[wasb://teststorage@testtest.blob.core.windows.net/data/tpcds-1T..., PartitionFilters: [], PushedFilters: [IsNotNull(cr_call_center_sk), IsNotNull(cr_returned_date_sk), IsNotNull(cr_returning_customer_sk)], ReadSchema: struct<cr_returned_date_sk:bigint,cr_returning_customer_sk:bigint,cr_call_center_sk:bigint,cr_net...
: : : : +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
: : : : +- *(2) Project [d_date_sk#287L]
: : : : +- *(2) Filter ((((isnotnull(d_year#293) && isnotnull(d_moy#295)) && (d_year#293 = 1998)) && (d_moy#295 = 11)) && isnotnull(d_date_sk#287L))
: : : : INDEX +- *(2) FileScan parquet [d_year#293,d_date_sk#287L,d_moy#295] Batched: true, Format: Parquet, Location: InMemoryFileIndex[wasb://teststorage@testtest.blob.core.windows.net/indexes/tpcds..., PartitionFilters: [], PushedFilters: [IsNotNull(d_year), IsNotNull(d_moy), EqualTo(d_year,1998), EqualTo(d_moy,11), IsNotNull(d_date_sk)], ReadSchema: struct<d_year:int,d_date_sk:bigint,d_moy:int>
: : : +- *(5) Project [c_customer_sk#251L, c_current_cdemo_sk#253L, c_current_hdemo_sk#254L, c_current_addr_sk#255L]
: : : +- *(5) Filter (((isnotnull(c_customer_sk#251L) && isnotnull(c_current_addr_sk#255L)) && isnotnull(c_current_cdemo_sk#253L)) && isnotnull(c_current_hdemo_sk#254L))
: : : INDEX +- *(5) FileScan parquet [c_customer_sk#251L,c_current_addr_sk#255L,c_current_cdemo_sk#253L,c_current_hdemo_sk#254L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[wasb://teststorage@testtest.blob.core.windows.net/indexes/tpcds..., PartitionFilters: [], PushedFilters: [IsNotNull(c_customer_sk), IsNotNull(c_current_addr_sk), IsNotNull(c_current_cdemo_sk), IsNotNull..., ReadSchema: struct<c_customer_sk:bigint,c_current_addr_sk:bigint,c_current_cdemo_sk:bigint,c_current_hdemo_sk..., SelectedBucketsCount: 200 out of 200
: : +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
: : +- *(6) Project [ca_address_sk#207L]
: : +- *(6) Filter ((isnotnull(ca_gmt_offset#218) && (ca_gmt_offset#218 = -7.0)) && isnotnull(ca_address_sk#207L))
: : INDEX +- *(6) FileScan parquet [ca_address_sk#207L,ca_gmt_offset#218] Batched: true, Format: Parquet, Location: InMemoryFileIndex[wasb://teststorage@testtest.blob.core.windows.net/indexes/tpcds..., PartitionFilters: [], PushedFilters: [IsNotNull(ca_gmt_offset), EqualTo(ca_gmt_offset,-7.0), IsNotNull(ca_address_sk)], ReadSchema: struct<ca_address_sk:bigint,ca_gmt_offset:double>
: +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
: +- *(7) Project [cd_demo_sk#233L, cd_marital_status#235, cd_education_status#236]
: +- *(7) Filter ((((cd_marital_status#235 = M) && (cd_education_status#236 = Unknown)) || ((cd_marital_status#235 = W) && (cd_education_status#236 = Advanced Degree))) && isnotnull(cd_demo_sk#233L))
: INDEX +- *(7) FileScan parquet [cd_demo_sk#233L,cd_education_status#236,cd_marital_status#235] Batched: true, Format: Parquet, Location: InMemoryFileIndex[wasb://teststorage@testtest.blob.core.windows.net/indexes/tpcds..., PartitionFilters: [], PushedFilters: [Or(And(EqualTo(cd_marital_status,M),EqualTo(cd_education_status,Unknown)),And(EqualTo(cd_marital..., ReadSchema: struct<cd_demo_sk:bigint,cd_education_status:string,cd_marital_status:string>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
+- *(8) Project [hd_demo_sk#343L]
+- *(8) Filter ((isnotnull(hd_buy_potential#345) && StartsWith(hd_buy_potential#345, Unknown)) && isnotnull(hd_demo_sk#343L))
INDEX +- *(8) FileScan parquet [hd_demo_sk#343L,hd_buy_potential#345] Batched: true, Format: Parquet, Location: InMemoryFileIndex[wasb://teststorage@testtest.blob.core.windows.net/indexes/tpcds..., PartitionFilters: [], PushedFilters: [IsNotNull(hd_buy_potential), StringStartsWith(hd_buy_potential,Unknown), IsNotNull(hd_demo_sk)], ReadSchema: struct<hd_demo_sk:bigint,hd_buy_potential:string>
physical plan - disabled hyperspace
*(12) Sort [Returns_Loss#55288 DESC NULLS LAST], true, 0
+- Exchange rangepartitioning(Returns_Loss#55288 DESC NULLS LAST, 200)
+- *(11) HashAggregate(keys=[cc_call_center_id#6, cc_name#11, cc_manager#16, cd_marital_status#235, cd_education_status#236], functions=[sum(cr_net_loss#111)], output=[Call_Center#55285, Call_Center_Name#55286, Manager#55287, Returns_Loss#55288])
+- Exchange hashpartitioning(cc_call_center_id#6, cc_name#11, cc_manager#16, cd_marital_status#235, cd_education_status#236, 200)
+- *(10) HashAggregate(keys=[cc_call_center_id#6, cc_name#11, cc_manager#16, cd_marital_status#235, cd_education_status#236], functions=[partial_sum(cr_net_loss#111)], output=[cc_call_center_id#6, cc_name#11, cc_manager#16, cd_marital_status#235, cd_education_status#236, sum#55301])
+- *(10) Project [cc_call_center_id#6, cc_name#11, cc_manager#16, cr_net_loss#111, cd_marital_status#235, cd_education_status#236]
+- *(10) BroadcastHashJoin [c_current_hdemo_sk#254L], [hd_demo_sk#343L], Inner, BuildRight
:- *(10) Project [cc_call_center_id#6, cc_name#11, cc_manager#16, cr_net_loss#111, c_current_hdemo_sk#254L, cd_marital_status#235, cd_education_status#236]
: +- *(10) BroadcastHashJoin [c_current_cdemo_sk#253L], [cd_demo_sk#233L], Inner, BuildRight
: :- *(10) Project [cc_call_center_id#6, cc_name#11, cc_manager#16, cr_net_loss#111, c_current_cdemo_sk#253L, c_current_hdemo_sk#254L]
: : +- *(10) BroadcastHashJoin [c_current_addr_sk#255L], [ca_address_sk#207L], Inner, BuildRight
: : :- *(10) Project [cc_call_center_id#6, cc_name#11, cc_manager#16, cr_net_loss#111, c_current_cdemo_sk#253L, c_current_hdemo_sk#254L, c_current_addr_sk#255L]
: : : +- *(10) SortMergeJoin [cr_returning_customer_sk#92L], [c_customer_sk#251L], Inner
: : : :- *(4) Sort [cr_returning_customer_sk#92L ASC NULLS FIRST], false, 0
: : : : +- Exchange hashpartitioning(cr_returning_customer_sk#92L, 200)
: : : : +- *(3) Project [cc_call_center_id#6, cc_name#11, cc_manager#16, cr_returning_customer_sk#92L, cr_net_loss#111]
: : : : +- *(3) BroadcastHashJoin [cr_returned_date_sk#85L], [d_date_sk#287L], Inner, BuildRight
: : : : :- *(3) Project [cc_call_center_id#6, cc_name#11, cc_manager#16, cr_returned_date_sk#85L, cr_returning_customer_sk#92L, cr_net_loss#111]
: : : : : +- *(3) BroadcastHashJoin [cc_call_center_sk#5L], [cr_call_center_sk#96L], Inner, BuildLeft
: : : : : :- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
: : : : : : +- *(1) Project [cc_call_center_sk#5L, cc_call_center_id#6, cc_name#11, cc_manager#16]
: : : : : : +- *(1) Filter isnotnull(cc_call_center_sk#5L)
: : : : : : +- *(1) FileScan parquet [cc_call_center_sk#5L,cc_call_center_id#6,cc_name#11,cc_manager#16] Batched: true, Format: Parquet, Location: InMemoryFileIndex[wasb://teststorage@testtest.blob.core.windows.net/data/tpcds-1T..., PartitionFilters: [], PushedFilters: [IsNotNull(cc_call_center_sk)], ReadSchema: struct<cc_call_center_sk:bigint,cc_call_center_id:string,cc_name:string,cc_manager:string> : : : : : +- *(3) Project [cr_returned_date_sk#85L, cr_returning_customer_sk#92L, cr_call_center_sk#96L, cr_net_loss#111]
: : : : : +- *(3) Filter ((isnotnull(cr_call_center_sk#96L) && isnotnull(cr_returned_date_sk#85L)) && isnotnull(cr_returning_customer_sk#92L))
: : : : : +- *(3) FileScan parquet [cr_returned_date_sk#85L,cr_returning_customer_sk#92L,cr_call_center_sk#96L,cr_net_loss#111] Batched: true, Format: Parquet, Location: InMemoryFileIndex[wasb://teststorage@testtest.blob.core.windows.net/data/tpcds-1T..., PartitionFilters: [], PushedFilters: [IsNotNull(cr_call_center_sk), IsNotNull(cr_returned_date_sk), IsNotNull(cr_returning_customer_sk)], ReadSchema: struct<cr_returned_date_sk:bigint,cr_returning_customer_sk:bigint,cr_call_center_sk:bigint,cr_net...
: : : : +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
: : : : +- *(2) Project [d_date_sk#287L]
: : : : +- *(2) Filter ((((isnotnull(d_year#293) && isnotnull(d_moy#295)) && (d_year#293 = 1998)) && (d_moy#295 = 11)) && isnotnull(d_date_sk#287L))
: : : : +- *(2) FileScan parquet [d_date_sk#287L,d_year#293,d_moy#295] Batched: true, Format: Parquet, Location: InMemoryFileIndex[wasb://teststorage@testtest.blob.core.windows.net/data/tpcds-1T..., PartitionFilters: [], PushedFilters: [IsNotNull(d_year), IsNotNull(d_moy), EqualTo(d_year,1998), EqualTo(d_moy,11), IsNotNull(d_date_sk)], ReadSchema: struct<d_date_sk:bigint,d_year:int,d_moy:int>
: : : +- *(6) Sort [c_customer_sk#251L ASC NULLS FIRST], false, 0
: : : +- Exchange hashpartitioning(c_customer_sk#251L, 200)
: : : +- *(5) Project [c_customer_sk#251L, c_current_cdemo_sk#253L, c_current_hdemo_sk#254L, c_current_addr_sk#255L]
: : : +- *(5) Filter (((isnotnull(c_customer_sk#251L) && isnotnull(c_current_addr_sk#255L)) && isnotnull(c_current_cdemo_sk#253L)) && isnotnull(c_current_hdemo_sk#254L))
: : : +- *(5) FileScan parquet [c_customer_sk#251L,c_current_cdemo_sk#253L,c_current_hdemo_sk#254L,c_current_addr_sk#255L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[wasb://teststorage@testtest.blob.core.windows.net/data/tpcds-1T..., PartitionFilters: [], PushedFilters: [IsNotNull(c_customer_sk), IsNotNull(c_current_addr_sk), IsNotNull(c_current_cdemo_sk), IsNotNull..., ReadSchema: struct<c_customer_sk:bigint,c_current_cdemo_sk:bigint,c_current_hdemo_sk:bigint,c_current_addr_sk...
: : +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
: : +- *(7) Project [ca_address_sk#207L]
: : +- *(7) Filter ((isnotnull(ca_gmt_offset#218) && (ca_gmt_offset#218 = -7.0)) && isnotnull(ca_address_sk#207L))
: : +- *(7) FileScan parquet [ca_address_sk#207L,ca_gmt_offset#218] Batched: true, Format: Parquet, Location: InMemoryFileIndex[wasb://teststorage@testtest.blob.core.windows.net/data/tpcds-1T..., PartitionFilters: [], PushedFilters: [IsNotNull(ca_gmt_offset), EqualTo(ca_gmt_offset,-7.0), IsNotNull(ca_address_sk)], ReadSchema: struct<ca_address_sk:bigint,ca_gmt_offset:double>
: +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
: +- *(8) Project [cd_demo_sk#233L, cd_marital_status#235, cd_education_status#236]
: +- *(8) Filter ((((cd_marital_status#235 = M) && (cd_education_status#236 = Unknown)) || ((cd_marital_status#235 = W) && (cd_education_status#236 = Advanced Degree))) && isnotnull(cd_demo_sk#233L))
: +- *(8) FileScan parquet [cd_demo_sk#233L,cd_marital_status#235,cd_education_status#236] Batched: true, Format: Parquet, Location: InMemoryFileIndex[wasb://teststorage@testtest.blob.core.windows.net/data/tpcds-1T..., PartitionFilters: [], PushedFilters: [Or(And(EqualTo(cd_marital_status,M),EqualTo(cd_education_status,Unknown)),And(EqualTo(cd_marital..., ReadSchema: struct<cd_demo_sk:bigint,cd_marital_status:string,cd_education_status:string>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
+- *(9) Project [hd_demo_sk#343L]
+- *(9) Filter ((isnotnull(hd_buy_potential#345) && StartsWith(hd_buy_potential#345, Unknown)) && isnotnull(hd_demo_sk#343L))
+- *(9) FileScan parquet [hd_demo_sk#343L,hd_buy_potential#345] Batched: true, Format: Parquet, Location: InMemoryFileIndex[wasb://teststorage@testtest.blob.core.windows.net/data/tpcds-1T..., PartitionFilters: [], PushedFilters: [IsNotNull(hd_buy_potential), StringStartsWith(hd_buy_potential,Unknown), IsNotNull(hd_demo_sk)], ReadSchema: struct<hd_demo_sk:bigint,hd_buy_potential:string>
Result - hyperspace enabled (only ran query 91)
***** Query Execution Times *****
q91 20949 10861 10390
***** Query Explain Times *****
q91 6498 2854 2326
Result - hyperspace disabled
***** Query Execution Times *****
q91 14426 8872 7868
***** Query Explain Times *****
q91 717 415 460
enabled / disabled
# of BroadCastJoin: 5 / 5
# of SortMergeJoin: 1 / 1
# number of Exchange: 3 / 4
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@imback82 @apoorvedave1 I added physical plan of disabled hyperspace & query execution/explain time.
I cannot see any difference of the plan except for removed 1 Exchange.
Maybe it's because of explain time, but no idea why it takes longer.
In any case, this shows isBroadcastJoin function doesn't work properly.
Other queries also show regressions, e.g.:
Result - hyperspace enabled (only ran 1 query)
***** Query Execution Times *****
q21 18933 15805 15275
***** Query Explain Times *****
q21 6218 1554 1730
Result - disabled hyperspace
***** Query Execution Times *****
q21 13479 10073 8741
***** Query Explain Times *****
q21 483 359 225
I'll share the plans for q21 if needed.
I think we need to make sure there's no regression from this rule, at least for tpcds testset, non-hybrid scan.
@apoorvedave1 Could you measure the perf and add required constraints to avoid these regressions? Thanks!
What changes were proposed in this pull request?
Introducing a new optimizer rule
JoinIndexRuleV2for optimizing join rules. This rule is introduced to overcome the limitations of the originalJoinIndexRule.Why was this change introduced?
JoinIndexRule:JoinIndexRuleV2:To enable JoinIndexRuleV2:
set spark conf
"spark.hyperspace.enableJoinRuleV2"totrue.Does this PR introduce any user-facing change?
no
How was this patch tested?
unit tests added
Notes From Comments:
Thanks @sezruby ,
V1 rule works on both sides of a join simultaneously so it can find better, more compatible indexes on both sides.
V2 works independently on either sides and so may not find the most compatible pair of indexes.
There's one case where this affects: multi-column join query:
V1 will find an index where the columns are compatible.
V2 may choose non-matching indexes like this
THere's no guarantee that for multi-column indexes, the most compatible index pair is selected