-
Notifications
You must be signed in to change notification settings - Fork 116
Merge RefreshAppendAction and RefreshDeleteAction #232
Conversation
| import com.microsoft.hyperspace.telemetry.{AppInfo, HyperspaceEvent, RefreshIncrementalActionEvent} | ||
|
|
||
| /** | ||
| * Action to create indexes on newly arrived data. If the user appends new data to existing, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To reviewers:
- This file is copied from RefreshAppendAction.scala and I haven't revised overall comments yet. I'll update the comments later.
- I haven't removed RefreshAppendAction.scala and RefreshDeleteAction.scala yet, for reference.
I'll remove them and events later.
@apoorvedave1 @pirz @imback82
So please give a quick comment if the approach is not desirable. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This approach looks good. Let's move forward with this!
ddd4320 to
1a1db3b
Compare
Could you please add an example that clarifies why it is difficult? |
pirz
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @sezruby - The overall approach looks fine to me. I left a few comments, mostly for question/clarification.
| "Refresh index is updating index by removing index entries " + | ||
| s"corresponding to ${deletedFiles.length} deleted source data files.") | ||
|
|
||
| if (appendedFiles.nonEmpty) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: In our current implementation for incremental refresh, we first handle deletes and then extend the index by indexing appended files. Here you have reversed the order. Is there a specific reason for that?
If possible and we first handle deletes and then append, then do we still need to add mode: SaveMode to DataFrameWriterExtensions.saveWithBuckets?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Previously, with only filenames instead of fileInfos, RefreshDeleteAction is required to be done firstly.
Because if we do RefreshAppendAction first, we cannot distinguish which rows should be removed while RefreshDeleteAction.
Now we don't need to care about the order because we know the list of index data files not including appended data.
I changed the order just because AppendRefresh uses write function which uses overwrite mode to write the data.
(I tried to keep previous impl. mostly)
Either way, mode: SaveMode is required because there are 2 data writings for "delete" and "append" respectively.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added some code to use "overwrite" mode in case there is no appended files, to make sure everything is clear before the write.
Thanks :)
src/main/scala/com/microsoft/hyperspace/actions/RefreshIncrementalAction.scala
Show resolved
Hide resolved
src/main/scala/com/microsoft/hyperspace/index/DataFrameWriterExtensions.scala
Outdated
Show resolved
Hide resolved
imback82
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks @sezruby!
| spark.read | ||
| .parquet(previousIndexLogEntry.content.files.map(_.toString): _*) | ||
| .filter( | ||
| !col(s"${IndexConstants.DATA_FILE_NAME_COLUMN}").isin(deletedFiles.map(_.name): _*)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| !col(s"${IndexConstants.DATA_FILE_NAME_COLUMN}").isin(deletedFiles.map(_.name): _*)) | |
| !col(IndexConstants.DATA_FILE_NAME_COLUMN).isin(deletedFiles.map(_.name): _*)) |
| numBuckets: Int, | ||
| bucketByColNames: Seq[String]): Unit = { | ||
| bucketByColNames: Seq[String], | ||
| mode: SaveMode = SaveMode.Overwrite): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's not use default value for the mode and be explicit.
| /** | ||
| * Index Refresh Event for appended source files. Emitted when refresh is called on an index | ||
| * Index Refresh Event for incremental mode. Emitted when refresh is called on an index | ||
| * with config flag set to create index for appended source data files. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to be updated.
| private def queryPlanHasExpectedRootPaths( | ||
| optimizedPlan: LogicalPlan, | ||
| expectedPaths: Seq[Path]): Boolean = { | ||
| assert(getAllRootPaths(optimizedPlan) === expectedPaths) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this check when the caller is doing assert? For printing diff using ===?
What is the context for this pull request?
What changes were proposed in this pull request?
This PR merges
RefreshAppendActionandRefreshDeleteActionasRefreshIncremental, which is required for #198.Without this, it is hard to calculate index signature value considering deletedFiles and appendedFiles -
as spark.read API doesn't allow to create a DF with non-existing file paths.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Unit test