Skip to content
This repository was archived by the owner on Jun 14, 2024. It is now read-only.

Conversation

@sezruby
Copy link
Collaborator

@sezruby sezruby commented Jan 18, 2021

What is the context for this pull request?

  • Tracking Issue: n/a
  • Parent Issue: n/a
  • Dependencies: n/a

What changes were proposed in this pull request?

Introduce a wrapper class of HadoopFsRelation for explicit plan transformation with index, in plan string.
When applying index, we could use IndexHadoopFsRelation instead of HadoopFsRelation as an identifier.

Instead of "parquet", print s"Index $indexName_$indexLogVersion".

 +- FileScan parquet [clicks#1066, ...

to

+- FileScan Hyperspace(Type: CI, Name: index_Both, LogVersion: 1) [clicks#1823, ...

Does this PR introduce any user-facing change?

Yes, plan with index changed as following:
current sparkPlan:

Project [clicks#1066, query#1064, Date#1072]
+- SortMergeJoin [clicks#1066], [clicks#1076], Inner
   :- Sort [clicks#1066 ASC NULLS FIRST], false, 0
   :  +- Filter (((clicks#1066 <= 4000) && (clicks#1066 >= 2000)) && isnotnull(clicks#1066))
   :     +- BucketUnion 200 buckets, bucket columns: [clicks]
   :        :- Project [clicks#1066, query#1064]
   :        :  +- Filter ((((isnotnull(_data_file_id#1103L) && NOT (_data_file_id#1103L = 3)) && isnotnull(clicks#1066)) && (clicks#1066 >= 2000)) && (clicks#1066 <= 4000))
   :        :     +- FileScan parquet [clicks#1066,Query#1064,_data_file_id#1103L] Batched: false, Format: Parquet, Location: InMemoryFileIndex[file:/path/to/hybridScanTest/index..., PartitionFilters: [], PushedFilters: [IsNotNull(_data_file_id), Not(EqualTo(_data_file_id,3)), IsNotNull(clicks), GreaterThanOrEqual(c..., ReadSchema: struct<clicks:int,Query:string,_data_file_id:bigint>, SelectedBucketsCount: 200 out of 200
   :        +- Exchange hashpartitioning(clicks#1066, 200)
   :           +- Project [clicks#1066, query#1064]
   :              +- Filter ((isnotnull(clicks#1066) && (clicks#1066 >= 2000)) && (clicks#1066 <= 4000))
   :                 +- FileScan parquet [Query#1064,clicks#1066] Batched: false, Format: Parquet, Location: InMemoryFileIndex[file:/path/to/resources/data/sample6/part-00..., PartitionFilters: [], PushedFilters: [IsNotNull(clicks), GreaterThanOrEqual(clicks,2000), LessThanOrEqual(clicks,4000)], ReadSchema: struct<Query:string,clicks:int>
   +- Project [clicks#1076, Date#1072]
      +- Filter (((NOT _data_file_id#1104L INSET (2,3) && isnotnull(clicks#1076)) && (clicks#1076 <= 4000)) && (clicks#1076 >= 2000))
         +- FileScan parquet[clicks#1076,Date#1072,_data_file_id#1104L] Batched: false, Format: Parquet, Location: InMemoryFileIndex[file:/path/to/hybridScanTest/index..., PartitionFilters: [], PushedFilters: [Not(In(_data_file_id, [2,3])), IsNotNull(clicks), LessThanOrEqual(clicks,4000), GreaterThanOrEqu..., ReadSchema: struct<clicks:int,Date:string,_data_file_id:bigint>, SelectedBucketsCount: 200 out of 200

SparkPlan with this PR

Project [clicks#1823, query#1821, Date#1829]
+- SortMergeJoin [clicks#1823], [clicks#1833], Inner
   :- Sort [clicks#1823 ASC NULLS FIRST], false, 0
   :  +- Filter (((clicks#1823 >= 2000) && isnotnull(clicks#1823)) && (clicks#1823 <= 4000))
   :     +- BucketUnion 200 buckets, bucket columns: [clicks]
   :        :- Project [clicks#1823, query#1821]
   :        :  +- Filter ((((isnotnull(_data_file_id#1860L) && NOT (_data_file_id#1860L = 3)) && isnotnull(clicks#1823)) && (clicks#1823 >= 2000)) && (clicks#1823 <= 4000))
   :        :     +- FileScan Hyperspace(Type: CI, Name: index_Both, LogVersion: 1) [clicks#1823,Query#1821,_data_file_id#1860L] Batched: false, Format: Parquet, Location: InMemoryFileIndex[file:/C:/Users/eunsong/repo/hyperspace2/src/test/resources/hybridScanTest/index..., PartitionFilters: [], PushedFilters: [IsNotNull(_data_file_id), Not(EqualTo(_data_file_id,3)), IsNotNull(clicks), GreaterThanOrEqual(c..., ReadSchema: struct<clicks:int,Query:string,_data_file_id:bigint>, SelectedBucketsCount: 200 out of 200
   :        +- Exchange hashpartitioning(clicks#1823, 200)
   :           +- Project [clicks#1823, query#1821]
   :              +- Filter ((isnotnull(clicks#1823) && (clicks#1823 >= 2000)) && (clicks#1823 <= 4000))
   :                 +- FileScan parquet [Query#1821,clicks#1823] Batched: false, Format: Parquet, Location: InMemoryFileIndex[file:/C:/Users/eunsong/AppData/Local/Temp/spark-a6e46b19-7723-400a-9322-6feb6e0..., PartitionFilters: [], PushedFilters: [IsNotNull(clicks), GreaterThanOrEqual(clicks,2000), LessThanOrEqual(clicks,4000)], ReadSchema: struct<Query:string,clicks:int>
   +- Project [clicks#1833, Date#1829]
      +- Filter (((NOT _data_file_id#1861L INSET (2,3) && isnotnull(clicks#1833)) && (clicks#1833 <= 4000)) && (clicks#1833 >= 2000))
         +- FileScan Hyperspace(Type: CI, Name: indexType2_Delete, LogVersion: 1) [clicks#1833,Date#1829,_data_file_id#1861L] Batched: false, Format: Parquet, Location: InMemoryFileIndex[file:/C:/Users/eunsong/repo/hyperspace2/src/test/resources/hybridScanTest/index..., PartitionFilters: [], PushedFilters: [Not(In(_data_file_id, [2,3])), IsNotNull(clicks), LessThanOrEqual(clicks,4000), GreaterThanOrEqu..., ReadSchema: struct<clicks:int,Date:string,_data_file_id:bigint>, SelectedBucketsCount: 200 out of 200

Logical Plan with this PR

Project [clicks#1823, query#1821, Date#1829]
+- Join Inner, (clicks#1823 = clicks#1833)
   :- BucketUnion 200 buckets, bucket columns: [clicks]
   :  :- Project [clicks#1823, query#1821]
   :  :  +- Filter ((isnotnull(clicks#1823) && (clicks#1823 >= 2000)) && (clicks#1823 <= 4000))
   :  :     +- Project [Query#1821, clicks#1823]
   :  :        +- Filter NOT (_data_file_id#1860L = 3)
   :  :           +- Relation[Query#1821,clicks#1823,_data_file_id#1860L] Hyperspace(Type: CI, Name: index_Both, LogVersion: 1)
   :  +- RepartitionByExpression [clicks#1823], 200
   :     +- Project [clicks#1823, query#1821]
   :        +- Filter ((isnotnull(clicks#1823) && (clicks#1823 >= 2000)) && (clicks#1823 <= 4000))
   :           +- Relation[Query#1821,clicks#1823] parquet
   +- Project [clicks#1833, Date#1829]
      +- Filter ((isnotnull(clicks#1833) && (clicks#1833 <= 4000)) && (clicks#1833 >= 2000))
         +- Project [Date#1829, clicks#1833]
            +- Filter NOT _data_file_id#1861L INSET (2,3)
               +- Relation[Date#1829,clicks#1833,_data_file_id#1861L] Hyperspace(Type: CI, Name: indexType2_Delete, LogVersion: 1)

How was this patch tested?

Unit test

@sezruby sezruby self-assigned this Jan 18, 2021
@sezruby
Copy link
Collaborator Author

sezruby commented Jan 18, 2021

@rapoth @imback82 let me know if you'd prefer other string instead of "CoveringIndex".

And I think it's also good to print the used index version & name in plan.. WDYT?
How about

FileScan CoveringIndex indexName_version ?

@sezruby sezruby changed the title Introduce IndexHadoopFsRelation Introduce IndexHadoopFsRelation to show applied index name & version in query plan Jan 20, 2021
Copy link
Contributor

@imback82 imback82 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally seems fine to me. But I need an approval from @apoorvedave1 around the id changes.

/**
* Wrapper class of HadoopFsRelation to indicate index application more explicitly in Plan string.
*/
class IndexHadoopFsRelation(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a reason for choosing class over case class, same as HadoopFsRelation (since it's a wrapper, I would think both are of same type)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because case-to-case inheritance is not proper in Scala.

Comment on lines 546 to 548
def logVersion: String = {
properties.getOrElse(IndexConstants.INDEX_LOG_VERSION_PROPERTY, "undefined")
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of introducing a key in the map, can't we simply use super.id directly here?
i mean

def logVersion: String = id.toString

If we do this, we will not even need to introduce endId() def in Actions.scala class
Action.begin(),
Action.end() set this id themselves and it's always up to date with the actual file name of the log entry file.

cc @imback82

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I updated this PR to use LogEntry.id.

Btw I added endId in #272 as the id is required in op().. @apoorvedave1 Could you also have a look at the change?

@apoorvedave1
Copy link
Contributor

thanks @sezruby , left some comments. basically I think the key IndexConstants.INDEX_LOG_VERSION_PROPERTY and the method def endId() could both be replaced by simply using the LogEntry.id variable. It's always up to date with the file version. Please lmk if this doesn't make sense.
cc @imback82

Copy link
Contributor

@imback82 imback82 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks @sezruby!

@imback82 imback82 merged commit 9192481 into microsoft:master Jan 26, 2021
@imback82 imback82 added the enhancement New feature or request label Jan 29, 2021
@imback82 imback82 added this to the January 2021 milestone Jan 29, 2021
@sezruby sezruby deleted the newwrapperplan branch February 3, 2021 11:54
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants