-
Notifications
You must be signed in to change notification settings - Fork 116
Add support for delete to index refresh #142
Conversation
sezruby
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The approach and WIP code generally LGTM. Thanks!
src/main/scala/com/microsoft/hyperspace/actions/RefreshDeleteAction.scala
Outdated
Show resolved
Hide resolved
src/main/scala/com/microsoft/hyperspace/index/IndexCollectionManager.scala
Outdated
Show resolved
Hide resolved
imback82
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this WIP?
src/main/scala/com/microsoft/hyperspace/actions/RefreshDeleteAction.scala
Show resolved
Hide resolved
src/test/scala/com/microsoft/hyperspace/index/RefreshIndexDeleteTests.scala
Outdated
Show resolved
Hide resolved
| var currentFiles = Seq[String]() | ||
| rels.head.rootPaths.foreach { p => | ||
| currentFiles ++= Content | ||
| .fromDirectory(path = new Path(p)) | ||
| .files | ||
| .map(_.toString) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| var currentFiles = Seq[String]() | |
| rels.head.rootPaths.foreach { p => | |
| currentFiles ++= Content | |
| .fromDirectory(path = new Path(p)) | |
| .files | |
| .map(_.toString) | |
| } | |
| val currentFiles = rels.head.rootPaths.flatMap { p => | |
| Content | |
| .fromDirectory(path = new Path(p)) | |
| .files | |
| .map(_.toString) | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: it might be better to explore IndexLogEntry.listLeafFiles() api for file listing here. We can move that function to PathUtils class for more generic use.
We can do that as a separate PR to keep this one simple.
(1. move listLeafFiles from IndexLogEntry to PathUtils. 2. Use listLeafFiles here instead of Content)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thnx, I went with your suggestion and I agree on making IndexLogEntry.listLeafFiles() available to other classes as it is more like a utility function that would pop up in different scenarios (like the one above).
src/main/scala/com/microsoft/hyperspace/actions/RefreshDeleteAction.scala
Show resolved
Hide resolved
apoorvedave1
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
left some minor comments, Thanks @pirz
|
@pirz For the following:
Can you please add it as follows: Old experience:
New experience: If user enables
|
| val indexDF = spark.read.parquet(previousIndexLogEntry.content.files.map(_.toString): _*) | ||
|
|
||
| ResolverUtils | ||
| .resolve(spark, IndexConstants.DATA_FILE_NAME_COLUMN, indexDF.schema.fieldNames) match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be good to move this to IndexLogEntry - previousIndexLogEntry.hasLineageColumn as I also need this utility func :) And I think it's better to check this first just using previousIndexLogEntry.schema and skip if false, before the above spark.read.parquet(....
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, I added def hasLineageColumn(spark: SparkSession): Boolean = {...} to IndexLogEntry.
| */ | ||
| private def getDeletedFiles: Seq[String] = { | ||
| val rels = previousIndexLogEntry.relations | ||
| val originalFiles = rels.head.data.properties.content.files.map(_.toString) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you check the file metadata (size / modification time ) as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sezruby Here we are looking for "deleted" files and file name suffices for that check. If a file is renamed we still mark it as deleted, however I am not sure if modifying content of an existing file is a valid scenario. Can you explain a bit what exactly you are suggesting? thnx
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I think the user can "overwrite" the files.
I think if we detect a mismatch of metadata, we shouldn't perform the action, but suggest to perform full refresh.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@imback82 Does that mean for existing files (those which are present both in originalFiles and currentFiles) we should do a full metadata comparison here and if there is a mismatch we abort on-going refresh action?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 It might be better to just do a full metadata comparison and abort (with a suggestion). It is becoming increasingly clear that we should be on the safe side. :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can handle the modified file as both deleted and appended file. Aborting DeleteRefresh in this PR for now, but later with append impl. we could delete & append refresh the file properly.
In case the metadata isn't checked here, signatureValid will fail. It won't fail because of the refresh. Then the result might be different.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FileInfo check added along with a new test case under RefreshIndexTests.scala to validate it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@apoorvedave1 as FYI
src/main/scala/com/microsoft/hyperspace/actions/RefreshDeleteAction.scala
Show resolved
Hide resolved
src/main/scala/com/microsoft/hyperspace/actions/RefreshDeleteAction.scala
Show resolved
Hide resolved
sezruby
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM Thanks @pirz !
|
LGTM, thanks @pirz! |
apoorvedave1
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM , Thanks @pirz
src/main/scala/com/microsoft/hyperspace/actions/RefreshDeleteAction.scala
Outdated
Show resolved
Hide resolved
src/main/scala/com/microsoft/hyperspace/actions/RefreshDeleteAction.scala
Outdated
Show resolved
Hide resolved
src/main/scala/com/microsoft/hyperspace/actions/RefreshDeleteAction.scala
Outdated
Show resolved
Hide resolved
src/main/scala/com/microsoft/hyperspace/actions/RefreshDeleteAction.scala
Outdated
Show resolved
Hide resolved
| def query(): DataFrame = | ||
| spark.read.parquet(location).filter("c3 == 'facebook'").select("c3", "c1") | ||
|
|
||
| // Verify index usage on latest version of index (v=1) after refresh. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you are verifying this way, I would do the following:
verifyIndexUsagewith version 0.- Delete the file -> verify index is not utilized
verifyIndexUsagewith version 1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I modified the test case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, if there is no verification before applying refresh, we are not really validating anything (it's possible that the test set up was wrong, refresh didn't work, etc.). To me, it's crucial to test this e2e scenario in E2E tests.
src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala
Outdated
Show resolved
Hide resolved
src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala
Outdated
Show resolved
Hide resolved
src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala
Outdated
Show resolved
Hide resolved
src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala
Outdated
Show resolved
Hide resolved
src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala
Outdated
Show resolved
Hide resolved
src/main/scala/com/microsoft/hyperspace/actions/RefreshDeleteAction.scala
Outdated
Show resolved
Hide resolved
src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala
Outdated
Show resolved
Hide resolved
imback82
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks @pirz!
|
Merged to master. Nice work! |
|
Looks awesome, thank you @pirz! 👍 |
What is the context for this pull request?
What changes were proposed in this pull request?
This PR handles updating index during refresh w.r.t deleted source data files. Updating index for newly appended source data files will be done via a separate PR #163.
This change adds capability to Refresh index by removing index entries from any deleted source data file.
Note this Refresh Action only fixes an index w.r.t deleted source data files and does not consider new source data files (if any). If some original source data file(s) are removed between previous version of index and now, this Refresh Action updates the index as follows:
Currently, this feature is protected under a Spark configuration flag:
spark.hyperspace.index.refresh.delete.enabledand is disabled by default.Why are the changes needed?
Currently, when user removes some data file(s), a full index rebuild is the only way to refresh any affected index and remove deleted records from index. This change makes incremental index refresh possible for such cases by fixing index files without any source data scan.
Does this PR introduce any user-facing change?
Yes, it changes the behavior of index refresh and helps with incremental index refresh to remove deleted index records.
Old experience:
New experience:
Steps 1 - 4 remain the same.
spark.hyperspace.index.refresh.delete.enabledthen Hyperspace experience remains similar to 5 and 6 above.spark.hyperspace.index.refresh.delete.enabledthen:How was this patch tested?
New test cases are added under new test suite
RefreshIndexDeleteTests.scala.