Skip to content
This repository was archived by the owner on Jun 14, 2024. It is now read-only.

Conversation

@sezruby
Copy link
Collaborator

@sezruby sezruby commented Sep 15, 2020

What changes were proposed in this pull request?

This PR allows users to use the hybrid scan for append-only dataset.

In order to support Hybrid Scan for append-only dataset, we need to merge the newly appended files and index data properly. Currently we have the following cases:

  • Case 1) Filter Index Rule & parquet source format
    • In this case, we can just add the file list of appended files to the file list of the index relation (i.e. index data). It's because:
      • it's guaranteed that newly appended source files always have the all columns in the index data (except for lineage).
      • Filter Index Rule does not utilize bucketing information for now; able to read both index data & newly appended data with 1 FileScan node. See below InMemoryFileIndex
...
<----:- *(1) Project [name#1, id#0]---->
<----:  +- *(1) Filter (isnotnull(id#0) && (id#0 >= 1))---->
<----:     +- *(1) FileScan parquet [id#0,name#1] Batched: true, Format: Parquet, 
                     Location: InMemoryFileIndex[<list of index data files> , <list of newly appended files>], PartitionFilters: [], PushedFilters: [IsNotNull(id), GreaterThanOrEqual(id,1)], ReadSchema: struct<id:int,name:string>---->
...
 +- BucketUnion 200 buckets, bucket columns: [l_orderkey]      <===== merge both plans
            :- Project [l_orderkey#21L]                        <===== original index plan
            :  +- Filter ((isnotnull(l_commitdate#32) && isnotnull(l_receiptdate#33)) && (l_commitdate#32 < l_receiptdate#33))
            :     +- Relation[l_orderkey#21L,l_partkey#22L,l_suppkey#23L,l_quantity#25,l_extendedprice#26,l_discount#27,l_returnflag#29,l_shipdate#31,l_commitdate#32,l_receiptdate#33,l_shipmode#35] parquet
            +- RepartitionByExpression [l_orderkey#21L], 200   <===== on-the-fly shuffle with index spec
               +- Project [l_orderkey#21L]                     <===== newly appended data
                  +- Filter ((isnotnull(l_commitdate#32) && isnotnull(l_receiptdate#33)) && (l_commitdate#32 < l_receiptdate#33))
                     +- Relation[l_orderkey#21L,l_partkey#22L,l_suppkey#23L,l_quantity#25,l_extendedprice#26,l_discount#27,l_returnflag#29,l_shipdate#31,l_commitdate#32,l_receiptdate#33,l_shipmode#35] parquet
  • transformPlanToUseIndex returns the transformed plan to utilize the given index.
  • transformPlanToUsePureIndex: if hybridScanEnbaled=false, it replaces the source location with the index data location same as before.
  • transformPlanToUseHybridIndexDataScan: if hybridScanEnabled=true, it firstly creates the plan with the index location similar to transformPlanToUsePureIndex
    • and if there is appended data,
      • if Case1, add appended file lists to the location along with index data files, and then return.
      • if Case3, create on-the-fly shuffle for the appended data and do BucketUnion.

Why are the changes needed?

To support Hybrid Scan for append-only dataset. (#150)

Does this PR introduce any user-facing change?

Yes, if a user turns on Hybrid Scan (spark.hyperspace.index.hybridscan.enabled=true), outdated indexes whose dataset got appended new files could be a candidate for both FilterIndexRule & JoinIndexRule.
The query plan is modified in optimizer to support this and the changed plan can be checked with hs.explain() API.

FilterRule - Case1

scala> hs.explain(query)
=============================================================
Plan with indexes:
=============================================================
Project [id#0, name#1]
+- Filter (isnotnull(id#0) && (id#0 = 1))
   <----+- FileScan parquet [id#0,name#1] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/C:/Users/eunsong/IdeaProjects/spark2/spark-warehouse/indexes/index33/v__=..., PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,1)], ReadSchema: struct<id:int,name:string>---->

=============================================================
Plan without indexes:
=============================================================
Project [id#0, name#1]
+- Filter (isnotnull(id#0) && (id#0 = 1))
   <----+- FileScan parquet [id#0,name#1] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/C:/Users/eunsong/IdeaProjects/spark2/table], PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,1)], ReadSchema: struct<id:int,name:string>---->

=============================================================
Indexes used:
=============================================================
index33:file:/C:/Users/eunsong/IdeaProjects/spark2/spark-warehouse/indexes/index33/v__=0

FilterRule - Case2

=============================================================
Plan with indexes:
=============================================================
<----Union---->
<----:- *(1) Project [id#75L, name#76]---->
<----:  +- *(1) Filter (isnotnull(id#75L) && (id#75L = 1))---->
<----:     +- *(1) FileScan parquet [id#75L,name#76] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/C:/Users/eunsong/IdeaProjects/spark2/spark-warehouse/indexes/indexjj/v__=..., PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,1)], ReadSchema: struct<id:bigint,name:string>, SelectedBucketsCount: 1 out of 200---->
<----+- *(2) Project [id#75L, name#76]---->
   <----+- *(2) Filter (isnotnull(id#75L) && (id#75L = 1))---->
      <----+- *(2) FileScan json [id#75L,name#76] Batched: false, Format: JSON, Location: InMemoryFileIndex[file:/C:/Users/eunsong/IdeaProjects/spark2/tablej/part-00000-b6853714-dc1b-450b..., PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,1)], ReadSchema: struct<id:bigint,name:string>---->

JoinRule - BroadcastHashJoin

=============================================================
Plan with indexes:
=============================================================
Project [id#0, name#1, name#40]
+- BroadcastHashJoin [id#0], [id#39], Inner, BuildRight
   <----:- BucketUnion 200 buckets, bucket columns: [id]---->
   <----:  :- *(1) Project [id#0, name#1]---->
   <----:  :  +- *(1) Filter ((isnotnull(id#0) && (id#0 = 1)) && (id#0 >= 1))---->
   <----:  :     +- *(1) FileScan parquet [id#0,name#1] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/C:/Users/eunsong/IdeaProjects/spark2/spark-warehouse/indexes/index33/v__=..., PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,1), GreaterThanOrEqual(id,1)], ReadSchema: struct<id:int,name:string>, SelectedBucketsCount: 1 out of 200---->
   <----:  +- Exchange hashpartitioning(id#0, 200)---->
   <----:     +- *(2) Project [id#0, name#1]---->
   <----:        +- *(2) Filter ((isnotnull(id#0) && (id#0 = 1)) && (id#0 >= 1))---->
   <----:           +- *(2) FileScan parquet [id#0,name#1] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/C:/Users/eunsong/IdeaProjects/spark2/table/part-00003-0fc50086-fd6e-4527-..., PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,1), GreaterThanOrEqual(id,1)], ReadSchema: struct<id:int,name:string>---->
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)))
      <----+- BucketUnion 200 buckets, bucket columns: [id]---->
         <----:- *(3) Project [id#39, name#40]---->
         <----:  +- *(3) Filter ((isnotnull(id#39) && (id#39 >= 1)) && (id#39 = 1))---->
         <----:     +- *(3) FileScan parquet [id#39,name#40] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/C:/Users/eunsong/IdeaProjects/spark2/spark-warehouse/indexes/index33/v__=..., PartitionFilters: [], PushedFilters: [IsNotNull(id), GreaterThanOrEqual(id,1), EqualTo(id,1)], ReadSchema: struct<id:int,name:string>, SelectedBucketsCount: 1 out of 200---->
         <----+- Exchange hashpartitioning(id#39, 200)---->
            <----+- *(4) Project [id#39, name#40]---->
               <----+- *(4) Filter ((isnotnull(id#39) && (id#39 >= 1)) && (id#39 = 1))---->
                  <----+- *(4) FileScan parquet [id#39,name#40] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/C:/Users/eunsong/IdeaProjects/spark2/table/part-00003-0fc50086-fd6e-4527-..., PartitionFilters: [], PushedFilters: [IsNotNull(id), GreaterThanOrEqual(id,1), EqualTo(id,1)], ReadSchema: struct<id:int,name:string>---->

=============================================================
Plan without indexes:
=============================================================
Project [id#0, name#1, name#40]
+- BroadcastHashJoin [id#0], [id#39], Inner, BuildRight
   <----:- Project [id#0, name#1]---->
   <----:  +- Filter ((isnotnull(id#0) && (id#0 = 1)) && (id#0 >= 1))---->
   <----:     +- FileScan parquet [id#0,name#1] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/C:/Users/eunsong/IdeaProjects/spark2/table], PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,1), GreaterThanOrEqual(id,1)], ReadSchema: struct<id:int,name:string>---->
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)))
      <----+- *(1) Project [id#39, name#40]---->
         <----+- *(1) Filter ((isnotnull(id#39) && (id#39 >= 1)) && (id#39 = 1))---->
            <----+- *(1) FileScan parquet [id#39,name#40] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/C:/Users/eunsong/IdeaProjects/spark2/table], PartitionFilters: [], PushedFilters: [IsNotNull(id), GreaterThanOrEqual(id,1), EqualTo(id,1)], ReadSchema: struct<id:int,name:string>---->

=============================================================
Indexes used:
=============================================================
index33:file:/C:/Users/eunsong/IdeaProjects/spark2/spark-warehouse/indexes/index33/v__=0

Join Rule - Sort Merge Join

=============================================================
Plan with indexes:
=============================================================
Project [id#69, name#70, name#103]
+- SortMergeJoin [id#69], [id#102], Inner
   :- *(3) Sort [id#69 ASC NULLS FIRST], false, 0
   <----:  +- BucketUnion 200 buckets, bucket columns: [id]---->
   <----:     :- *(1) Project [id#69, name#70]---->
   <----:     :  +- *(1) Filter ((isnotnull(id#69) && (id#69 = 1)) && (id#69 >= 1))---->
   <----:     :     +- *(1) FileScan parquet [id#69,name#70] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/C:/Users/eunsong/IdeaProjects/spark2/spark-warehouse/indexes/index33/v__=..., PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,1), GreaterThanOrEqual(id,1)], ReadSchema: struct<id:int,name:string>, SelectedBucketsCount: 1 out of 200---->
   <----:     +- Exchange hashpartitioning(id#69, 200)---->
   <----:        +- *(2) Project [id#69, name#70]---->
   <----:           +- *(2) Filter ((isnotnull(id#69) && (id#69 = 1)) && (id#69 >= 1))---->
   <----:              +- *(2) FileScan parquet [id#69,name#70] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/C:/Users/eunsong/IdeaProjects/spark2/table/part-00003-0fc50086-fd6e-4527-..., PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,1), GreaterThanOrEqual(id,1)], ReadSchema: struct<id:int,name:string>---->
   +- *(6) Sort [id#102 ASC NULLS FIRST], false, 0
      <----+- BucketUnion 200 buckets, bucket columns: [id]---->
         <----:- *(4) Project [id#102, name#103]---->
         <----:  +- *(4) Filter ((isnotnull(id#102) && (id#102 >= 1)) && (id#102 = 1))---->
         <----:     +- *(4) FileScan parquet [id#102,name#103] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/C:/Users/eunsong/IdeaProjects/spark2/spark-warehouse/indexes/index33/v__=..., PartitionFilters: [], PushedFilters: [IsNotNull(id), GreaterThanOrEqual(id,1), EqualTo(id,1)], ReadSchema: struct<id:int,name:string>, SelectedBucketsCount: 1 out of 200---->
         <----+- Exchange hashpartitioning(id#102, 200)---->
            <----+- *(5) Project [id#102, name#103]---->
               <----+- *(5) Filter ((isnotnull(id#102) && (id#102 >= 1)) && (id#102 = 1))---->
                  <----+- *(5) FileScan parquet [id#102,name#103] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/C:/Users/eunsong/IdeaProjects/spark2/table/part-00003-0fc50086-fd6e-4527-..., PartitionFilters: [], PushedFilters: [IsNotNull(id), GreaterThanOrEqual(id,1), EqualTo(id,1)], ReadSchema: struct<id:int,name:string>---->

=============================================================
Plan without indexes:
=============================================================
Project [id#69, name#70, name#103]
+- SortMergeJoin [id#69], [id#102], Inner
   :- *(2) Sort [id#69 ASC NULLS FIRST], false, 0
   <----:  +- Exchange hashpartitioning(id#69, 200)---->
   <----:     +- *(1) Project [id#69, name#70]---->
   <----:        +- *(1) Filter ((isnotnull(id#69) && (id#69 = 1)) && (id#69 >= 1))---->
   <----:           +- *(1) FileScan parquet [id#69,name#70] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/C:/Users/eunsong/IdeaProjects/spark2/table], PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,1), GreaterThanOrEqual(id,1)], ReadSchema: struct<id:int,name:string>---->
   +- *(4) Sort [id#102 ASC NULLS FIRST], false, 0
      <----+- Exchange hashpartitioning(id#102, 200)---->
         <----+- *(3) Project [id#102, name#103]---->
            <----+- *(3) Filter ((isnotnull(id#102) && (id#102 >= 1)) && (id#102 = 1))---->
               <----+- *(3) FileScan parquet [id#102,name#103] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/C:/Users/eunsong/IdeaProjects/spark2/table], PartitionFilters: [], PushedFilters: [IsNotNull(id), GreaterThanOrEqual(id,1), EqualTo(id,1)], ReadSchema: struct<id:int,name:string>---->

=============================================================
Indexes used:
=============================================================
index33:file:/C:/Users/eunsong/IdeaProjects/spark2/spark-warehouse/indexes/index33/v__=0

How was this patch tested?

Unit test & TPCH validation

@rapoth rapoth requested review from apoorvedave1 and pirz September 16, 2020 01:11
@rapoth rapoth added this to the 0.4.0 milestone Sep 16, 2020
@rapoth rapoth added advanced issue This is the tag for advanced issues which involve major design changes or introduction enhancement New feature or request labels Sep 16, 2020
@rapoth
Copy link
Contributor

rapoth commented Sep 16, 2020

The query plan is modified in optimizer to support this and the changed plan can be checked with hs.explain() API.

Can you add an example of how hs.explain will look like for a simple scenario?

Copy link
Contributor

@imback82 imback82 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did one round of review (but not tests yet). It looks pretty cool!

// Remove sort order because we cannot guarantee the ordering of source files
val bucketSpec = index.bucketSpec.copy(sortColumnNames = Seq())

object ExtractTopLevelPlanForShuffle {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we define this outside this function with proper comment, etc.?

Copy link
Collaborator Author

@sezruby sezruby Sep 18, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's located here because of index.indexedColumns. I tried to define outside and pass the column names but I couldn't find a way.

Anyway it seems this extractor is not required for now..#165 (comment)

* @param indexPlan replaced plan with index
* @return complementIndexPlan integrated plan of indexPlan and complementPlan
*/
private def getComplementIndexPlan(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be the "return" value of getHybridScanIndexPlan so why is this called getComplementPlan? What is it complementing? From what I can understand the use of complement doesn't seem appropriate. I don't have any good ideas but wanted to understand the semantic meaning here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used "complement" as this additional plan (for appended files) complements the index plan.
getCompleteIndexPlan / getComplemantaryIndexPlan / .. 🤔

Any good suggestion would be welcomed. :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw your other renames. How about complementIndexScanWithDataScan? (since you are already using a verb like transform, I thought we could use complement as a word)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I refactored the getComplemetPlan into 2 different functions - transformPlanWithAppendedFiles, shufflePlanWithIndexSpec :)

Copy link
Contributor

@rapoth rapoth left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor opinionated comments but please feel free to ignore. Looking great, thanks!

* @param indexPlan replaced plan with index
* @return complementIndexPlan integrated plan of indexPlan and complementPlan
*/
private def getComplementIndexPlan(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw your other renames. How about complementIndexScanWithDataScan? (since you are already using a verb like transform, I thought we could use complement as a word)

@sezruby
Copy link
Collaborator Author

sezruby commented Sep 22, 2020

@pirz @apoorvedave1 Could you do a review for this PR? Thanks!

@apoorvedave1
Copy link
Contributor

LGTM 👍 thanks @sezruby

apoorvedave1
apoorvedave1 previously approved these changes Sep 25, 2020
_) =>
val curFileSet = location.allFiles
.map(f => FileInfo(f.getPath.toString, f.getLen, f.getModificationTime))
filesAppended =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think filesAppended (and deleted) should be calculated before this function gets called, and call this function only if it's eligible to do hybrid scan. We don't have to address this now, but I will bring this up in a later PRs.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point - I'll handle this with #171 or another PR.

// shuffle the appended data in the same way to correctly merge with bucketed index data.

// Clear sortColumnNames as BucketUnion does not keep the sort order within a bucket.
val bucketSpec = index.bucketSpec.copy(sortColumnNames = Seq())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we can use Nil here. (more consistent in this file)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should just use numBuckets in BucketUnion*? Are we using any other fields from BucketSpec?

Also, can you add an assert in BucketUnionExec.outputPartitioning that the number of partitions is same as the number of buckets? I think that's our assumption right?

assert(children.head.outputPartitioning.asInstanceOf[HashPartitioning].numPartitions == bucketSpec.numBuckets)

Copy link
Contributor

@imback82 imback82 Sep 25, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, do we need to check if the bucketed columns are the partitioning expressions (somewhere in BucketUnion*)?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should just use numBuckets in BucketUnion*? Are we using any other fields from BucketSpec?

Yes currently only numBuckets is used in BucketUnion*. But bucket column & sort column info are also shown in plan, which might help to analyze. And maybe we could use it later in optimization.

Btw, do we need to check if the bucketed columns are the partitioning expressions (somewhere in BucketUnion*)?

I tried to find a way to check partitioning expressions in BucketUnion* but there's no such API for it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes currently only numBuckets is used in BucketUnion*. But bucket column & sort column info are also shown in plan, which might help to analyze. And maybe we could use it later in optimization.

The confusion I have is that the following sounds like it has an effect, whereas it doesn't really do anything:

// Clear sortColumnNames as BucketUnion does not keep the sort order within a bucket.
val bucketSpec = index.bucketSpec.copy(sortColumnNames = Seq())

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, as far as I know, bucket spec is meant to be used for the scan node. So it may be more confusing if we print out it in the BucketUnion. We can take this up as a separate PR, but I think we need to address this.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok I revised the comment a bit. But I think the information might help to understand the behavior? Does this look confusing?

 +- *(5) Project [o_orderkey#61L, o_orderdate#65, o_shippriority#68]
:        +- *(5) SortMergeJoin [c_custkey#5L], [o_custkey#62L], Inner
:           :- *(1) Project [c_custkey#5L]
:           :  +- *(1) Filter ((isnotnull(c_mktsegment#11) && (c_mktsegment#11 = BUILDING)) && isnotnull(c_custkey#5L))
:           :     +- *(1) FileScan parquet [c_custkey#5L,c_mktsegment#11] Batched: true, Format: Parquet, Location: InMemoryFileIndex[wasb://eunjin-hyperspace-test@taruntestdiag.blob.core.windows.net/indexes/index..., PartitionFilters: [], PushedFilters: [IsNotNull(c_mktsegment), EqualTo(c_mktsegment,BUILDING), IsNotNull(c_custkey)], ReadSchema: struct<c_custkey:bigint,c_mktsegment:string>, SelectedBucketsCount: 200 out of 200
:           +- *(4) Sort [o_custkey#62L ASC NULLS FIRST], false, 0
:              +- BucketUnion 200 buckets, bucket columns: [o_custkey]             <===============
:                 :- *(2) Project [o_orderkey#61L, o_custkey#62L, o_orderdate#65, o_shippriority#68]
:                 :  +- *(2) Filter (((isnotnull(o_orderdate#65) && (o_orderdate#65 < 1995-03-15)) && isnotnull(o_custkey#62L)) && isnotnull(o_orderkey#61L))
:                 :     +- *(2) FileScan parquet [o_custkey#62L,o_orderkey#61L,o_orderdate#65,o_shippriority#68] Batched: true, Format: Parquet, Location: InMemoryFileIndex[wasb://eunjin-hyperspace-test@taruntestdiag.blob.core.windows.net/indexes/index..., PartitionFilters: [], PushedFilters: [IsNotNull(o_orderdate), LessThan(o_orderdate,1995-03-15), IsNotNull(o_custkey), IsNotNull(o_orde..., ReadSchema: struct<o_custkey:bigint,o_orderkey:bigint,o_orderdate:string,o_shippriority:int>, SelectedBucketsCount: 200 out of 200
:                 +- Exchange hashpartitioning(o_custkey#62L, 200)
:                    +- *(3) Project [o_orderkey#61L, o_custkey#62L, o_orderdate#65, o_shippriority#68]
:                       +- *(3) Filter (((isnotnull(o_orderdate#65) && (o_orderdate#65 < 1995-03-15)) && isnotnull(o_custkey#62L)) && isnotnull(o_orderkey#61L))
:                          +- *(3) FileScan parquet [o_custkey#62L,o_orderkey#61L,o_orderdate#65,o_shippriority#68] Batched: true, Format: Parquet, Location: InMemoryFileIndex[wasb://eunjin-hyperspace-test@taruntestdiag.blob.core.windows.net/data/tpch-par..., PartitionFilters: [], PushedFilters: [IsNotNull(o_orderdate), LessThan(o_orderdate,1995-03-15), IsNotNull(o_custkey), IsNotNull(o_orde..., ReadSchema: struct<o_custkey:bigint,o_orderkey:bigint,o_orderdate:string,o_shippriority:int>

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it may be better if we print out the output partitioning, similar to Exchange since we are unioning plans that have the same hash partitioning.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But new comment sounds better and let's take this up separately.

@pirz
Copy link
Contributor

pirz commented Sep 25, 2020

LGTM, Thanks @sezruby

Comment on lines +392 to +393
// in their output; Case 1 won't be shown in use cases. The implementation is kept
// for future use cases.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't comment on #165 (comment), but is this now being tested? If it's not covered, then I wouldn't add the case and just throw if this case is hit.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This behavior is covered by below test("Verify the location of injected shuffle for Hybrid Scan.") {

// shuffle the appended data in the same way to correctly merge with bucketed index data.

// Clear sortColumnNames as BucketUnion does not keep the sort order within a bucket.
val bucketSpec = index.bucketSpec.copy(sortColumnNames = Seq())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes currently only numBuckets is used in BucketUnion*. But bucket column & sort column info are also shown in plan, which might help to analyze. And maybe we could use it later in optimization.

The confusion I have is that the following sounds like it has an effect, whereas it doesn't really do anything:

// Clear sortColumnNames as BucketUnion does not keep the sort order within a bucket.
val bucketSpec = index.bucketSpec.copy(sortColumnNames = Seq())

}

test("Verify the location of injected shuffle for Hybrid Scan.") {
val dataPath = systemPath.toString + "/hbtable"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you check if you can utilize withTempDir? I just committed.

Copy link
Contributor

@imback82 imback82 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me! (pending minor comments)

// Make sure there is no shuffle.
execPlan.foreach(p => assert(!p.isInstanceOf[ShuffleExchangeExec]))

checkAnswer(baseQuery, filter)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test looks much better! 👍

Comment on lines 182 to 183
withTempDir { tempDir =>
val dataPath = tempDir + "/hbtable"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can just do withTempDir { dataPath => right? (No need for "/hbtable")

Copy link
Collaborator Author

@sezruby sezruby Sep 26, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"/hbtabe" is requires as tempDir is directory - write.parquet(dataPath) will fail

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, it fails since the directory is already created. You need withTempPath (which is same implemenation as withTempDir except that it deletes the directory created). I pushed the changes.

Btw, parquet() works on a directory (if you do overwrite).

Copy link
Contributor

@imback82 imback82 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks @sezruby! 🚀🚀🚀

@imback82 imback82 merged commit 83a9ab5 into microsoft:master Sep 26, 2020
@imback82
Copy link
Contributor

Merged to master. Great work @sezruby! cc: @rapoth

@sezruby sezruby deleted the hybridscan_plan branch October 7, 2020 13:17
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

advanced issue This is the tag for advanced issues which involve major design changes or introduction enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants