Skip to content
This repository was archived by the owner on Jun 14, 2024. It is now read-only.

Conversation

@pirz
Copy link
Contributor

@pirz pirz commented Sep 23, 2020

What is the context for this pull request?

This PR extends optimizer rules to consider and leverage indexes with deleted source data files.

This is needed as part of adding support for enforcing delete during read time.

What changes were proposed in this pull request?

This PR makes changes to FilterIndexRule and JoinIndexRule and extends them so they can leverage an index with deleted source data files, whose index metadata is already updated by a refresh call.

This is done by transforming query plan and adding extra a pair of Filter -> Project nodes on top of index scan node to exclude index records which are coming from deleted source data files listed in index's excluded files.

Here is an example query plan transformed by FilterIndexRule for Filter -> Project:

// Original query plan
== Optimized Logical Plan ==
Project [c3#173, c1#171]
+- Filter (isnotnull(c3#173) && (c3#173 = facebook))
   +- Relation[c1#171,c2#172,c3#173,c4#174,c5#175] parquet
   
// Transformed plan
== Optimized Logical Plan ==
Project [c3#187, c1#185]
+- Filter (isnotnull(c3#187) && (c3#187 = facebook))
   +- Project [c1#185, c3#187]
      +- Filter NOT _data_file_name#198 IN (file:/C:/.../part-00000.parquet)
         +- Relation[c1#185,c3#187,_data_file_name#198] parquet
		 
== Physical Plan ==
*(1) Project [c3#187, c1#185]
+- *(1) Filter ((NOT _data_file_name#198 IN (file:/C:/.../part-00000.parquet) && isnotnull(c3#187)) && (c3#187 = facebook))
   +- *(1) FileScan parquet [c3#187,c1#185,_data_file_name#198] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/C:/.../v__=0/..., PartitionFilters: [], PushedFilters: [Not(In(_data_file_name, [file:/C:/.../part-00000.parquet, ReadSchema: struct<c3:string,c1:string,_data_file_name:string>

Here is an example query plan transformed by FilterIndexRule for select *:

// Original query plan
== Optimized Logical Plan ==
Filter (isnotnull(c3#200) && (c3#200 = facebook))
+- Relation[c1#198,c2#199,c3#200,c4#201,c5#202] parquet

// Transformed plan  
== Optimized Logical Plan ==
Filter (isnotnull(c3#212) && (c3#212 = facebook))
+- Project [c1#210, c2#211, c3#212, c4#213, c5#214]
   +- Filter NOT _data_file_name#221 IN (file:/C:/.../part-00000.parquet)
      +- Relation[c1#210,c2#211,c3#212,c4#213,c5#214,_data_file_name#221] parquet

== Physical Plan ==
*(1) Project [c1#210, c2#211, c3#212, c4#213, c5#214]
+- *(1) Filter ((NOT _data_file_name#221 IN (file:/C:/.../part-00000.parquet) && isnotnull(c3#212)) && (c3#212 = facebook))
   +- *(1) FileScan parquet [c3#212,c4#213,c2#211,c5#214,c1#210,_data_file_name#221] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/C:/.../v__=0/..., PartitionFilters: [], PushedFilters: [Not(In(_data_file_name, [file:/C:/.../part-00000.parquet, ReadSchema: struct<c3:string,c4:int,c2:string,c5:int,c1:string,_data_file_name:string>

Here is an example query plan transformed by JoinIndexRule:

// Original query plan
== Optimized Logical Plan ==
Project [C1#232, c4#245]
+- Join Inner, (c3#234 = C3#244)
   :- Project [c1#232, c3#234]
   :  +- Filter isnotnull(c3#234)
   :     +- Relation[c1#232,c2#233,c3#234,c4#235,c5#236] parquet
   +- Project [c3#244, c4#245]
      +- Filter isnotnull(C3#244)
         +- Relation[c1#242,c2#243,c3#244,c4#245,c5#246] parquet

// Transformed plan
== Optimized Logical Plan ==
Project [C1#231, c4#244]
+- Join Inner, (c3#233 = C3#243)
   :- Project [c1#231, c3#233]
   :  +- Filter isnotnull(c3#233)
   :     +- Project [c1#231, c3#233]
   :        +- Filter NOT _data_file_name#369 IN (file:/C:/.../part-00000.parquet)
   :           +- Relation[c1#231,c3#233,_data_file_name#369] parquet
   +- Project [c3#243, c4#244]
      +- Filter isnotnull(C3#243)
         +- Project [c3#243, c4#244]
            +- Filter NOT _data_file_name#370 IN (file:/C:/.../part-00000.parquet)
               +- Relation[c3#243,c4#244,_data_file_name#370] parquet

== Physical Plan ==
*(3) Project [C1#231, c4#244]
+- *(3) SortMergeJoin [c3#233], [C3#243], Inner
   :- *(1) Project [c1#231, c3#233]
   :  +- *(1) Filter (NOT _data_file_name#369 IN (file:/C:/.../part-00000.parquet) && isnotnull(c3#233))
   :     +- *(1) FileScan parquet [c3#233,c1#231,_data_file_name#369] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/C:/.../leftIndex/v__=0/..., PartitionFilters: [], PushedFilters: [Not(In(_data_file_name, [file:/C:/myGit/hyperspace-1/src/test/resources/e2eTests/ixRefreshTest/p..., ReadSchema: struct<c3:string,c1:string,_data_file_name:string>, SelectedBucketsCount: 200 out of 200
   +- *(2) Project [c3#243, c4#244]
      +- *(2) Filter (NOT _data_file_name#370 IN (file:/C:/.../part-00000.parquet) && isnotnull(C3#243))
         +- *(2) FileScan parquet [c3#243,c4#244,_data_file_name#370] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/C:/.../rightIndex/v__=0/, PartitionFilters: [], PushedFilters: [Not(In(_data_file_name, [file:/C:/myGit/hyperspace-1/src/test/resources/e2eTests/ixRefreshTest/p..., ReadSchema: struct<c3:string,c4:int,_data_file_name:string>, SelectedBucketsCount: 200 out of 200

Does this PR introduce any user-facing change?

Yes, with this PR users are able to leverage an index with deleted source data files during query time.
Once such an index is leveraged, query plans show an extra pair of Filter and Project nodes which are added to exclude index records originated from deleted source data files.

How was this patch tested?

Test cases are added under E2EHyperspaceRulesTests.scala.

@pirz pirz marked this pull request as draft September 23, 2020 21:25
@pirz pirz self-assigned this Sep 23, 2020
@pirz pirz added this to the 0.4.0 milestone Sep 23, 2020
@pirz pirz added advanced issue This is the tag for advanced issues which involve major design changes or introduction enhancement New feature or request labels Sep 23, 2020
Copy link
Contributor

@rapoth rapoth left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good so far, thanks!

NamedExpression.newExprId)
val deletedFileNames = index.excludedFiles.map(Literal(_)).toArray
val rel = baseRelation.copy(relation = relation, output = updatedOutput ++ Seq(lAttr))
val filter = Filter(condition = Not(In(lAttr, deletedFileNames)), rel)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pirz One thing to evaluate here is what happens when a large number of files are deleted. For instance, I can imagine the query plan can get pretty big. Can you run some quick benchmark to test this for a few 1000 files deleted? I think this will have implication on query compilation time. An alternate way of doing this would be to express this as a JOIN against another table containing the list of deleted files.

CC: @imback82 @sezruby @apoorvedave1 for a second opinion in case they think this is unnecessary.

Copy link
Collaborator

@sezruby sezruby Sep 30, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good if @pirz could run TPCH with 100k chunks dataset for the injected filter condition of deleted files while I'm OOF this week :)

  1. build index with lineage column on full dataset (lineitem/part 0~99999 files)
  2. remove files
    • case 1) delete 200 files
    • case 2) delete 5000 files
    • case 3) delete 25k files
    • case 4) delete 50k files
  3. run
    • run w/o index
    • run w/o refreshed index => similar to partial index
    • run w/ refreshed index => similar to hybrid scan, delete-only
    • run w/ fully refreshed index

I usually run 4x4 cases but I think it's better to test (5k or 25k deleted files with refreshed index) quickly to see the regression.
(Jfyi, deleting files on remote storage, one by one, took quite a lot of time from my experience...)

@imback82
Copy link
Contributor

Closed by #171

@imback82 imback82 closed this Oct 13, 2020
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

advanced issue This is the tag for advanced issues which involve major design changes or introduction enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants