-
Notifications
You must be signed in to change notification settings - Fork 116
Add Delta Lake version history to IndexLogEntry for efficient time travel query #272
Add Delta Lake version history to IndexLogEntry for efficient time travel query #272
Conversation
src/main/scala/com/microsoft/hyperspace/actions/CreateActionBase.scala
Outdated
Show resolved
Hide resolved
| with Action { | ||
| final override def logEntry: LogEntry = getIndexLogEntry(spark, df, indexConfig, indexDataPath) | ||
| final override def logEntry: LogEntry = | ||
| getIndexLogEntry(spark, df, indexConfig, indexDataPath, super[Action].endId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: need to get the index version id, but it's not available in CreateActionBase.getIndexLogEnrtry.
So I added endId function and with Action
src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala
Outdated
Show resolved
Hide resolved
d843425 to
6e8b2d6
Compare
3dc848c to
66e736f
Compare
66e736f to
862bb98
Compare
|
@pirz @apoorvedave1 Could you review the change and give any comment on the approach - keeping version history in properties -? Thanks! |
4534867 to
8a3929c
Compare
e9a9b8a to
7f3fc2c
Compare
1bd3a77 to
c415baf
Compare
src/main/scala/com/microsoft/hyperspace/index/sources/delta/DeltaLakeFileBasedSource.scala
Outdated
Show resolved
Hide resolved
| // Get the timestamp string for time travel query using "timestampAsOf" option. | ||
| val timestampStr = getSparkFormattedTimestamps(System.currentTimeMillis).head | ||
| // Sleep 1 second because the unit of commit timestamp is second. | ||
| Thread.sleep(1000) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid using sleep in ut as it will take too much time especially when this method is called multiple times.
To simulate time just take a global Time variable currTime = System.currentTimeMillis and then keep on adding 1000 to it whenever you want to move time and then just use with getSparkFormattedTimestamps function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added this because delta lake internally records the time in second unit. So I forces to record different times for each version update, so that I could test the query with timestamp properly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so, I ran your test and used DeltaTable.forPath(spark, dataPath).history().show(10, false) to view timestamp (2021-02-02 14:49:19.077). It used milliseconds and each op should have a readVersion ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can use the timestamp in delta table to store read versions ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems Azure test framework needs sleep for some reason :D.. f19614b
I might have added this after debugging / investigation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added this because delta lake internally records the time in second unit. So I forces to record different times for each version update, so that I could test the query with timestamp properly.
Maybe add this comment to the code? Reading the existing comment, I didn't know why we had to sleep.
| */ | ||
| trait Action extends HyperspaceEventLogging with Logging with ActiveSparkSession { | ||
| protected val baseId: Int = logManager.getLatestId().getOrElse(-1) | ||
| protected def endId: Int = baseId + 2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, why +2, can you please explain ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's because we write the log twice at begin() using baseId + 1 and end() using baseId + 2.
As begin() has in-progress state (e.g. creating), Hyperspace always reads the log file written in "end()"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Can we rename it to firstLogEntryId and lastLogEntryId. And add a comment for the same.
Also should we have an assert that firstLogEntryId would always be -1 or an odd number ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
baseId is not firstLogEntryId and it requires some change..
If an operation failed in op(), then firstLogEntryId could be an odd number. (though failures are not handled properly now. e.g. #248)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is required because LogEntry.id is not available when creating - IndexLogEntry
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add final modifier?
|
@sezruby Could you fix the conflicts?
This can be removed now? |
| } | ||
|
|
||
| /** | ||
| * Returns index version related properties. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: That looks like something for @return below?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but this line is consistent with src/main/scala/com/microsoft/hyperspace/index/sources/interfaces.scala and other functions/source providers also have the comments in the same way.
Let's keep this for consistency for now. Thanks!
src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedSource.scala
Outdated
Show resolved
Hide resolved
src/main/scala/com/microsoft/hyperspace/index/sources/delta/DeltaLakeFileBasedSource.scala
Outdated
Show resolved
Hide resolved
src/main/scala/com/microsoft/hyperspace/index/sources/delta/DeltaLakeFileBasedSource.scala
Outdated
Show resolved
Hide resolved
|
@sezruby Could you resolve the conflicts? Let's try to get this in next (sorry for the delay). @apoorvedave1 Can you also help with the review when you get a chance? Thanks. |
src/main/scala/com/microsoft/hyperspace/index/sources/delta/DeltaLakeRelation.scala
Outdated
Show resolved
Hide resolved
src/main/scala/com/microsoft/hyperspace/index/sources/delta/DeltaLakeRelation.scala
Show resolved
Hide resolved
src/main/scala/com/microsoft/hyperspace/index/IndexManager.scala
Outdated
Show resolved
Hide resolved
ded45b6 to
7615f92
Compare
src/main/scala/com/microsoft/hyperspace/index/sources/interfaces.scala
Outdated
Show resolved
Hide resolved
src/main/scala/com/microsoft/hyperspace/index/sources/interfaces.scala
Outdated
Show resolved
Hide resolved
src/main/scala/com/microsoft/hyperspace/actions/CreateActionBase.scala
Outdated
Show resolved
Hide resolved
src/main/scala/com/microsoft/hyperspace/actions/RefreshAction.scala
Outdated
Show resolved
Hide resolved
src/main/scala/com/microsoft/hyperspace/actions/RefreshAction.scala
Outdated
Show resolved
Hide resolved
src/main/scala/com/microsoft/hyperspace/actions/RefreshIncrementalAction.scala
Outdated
Show resolved
Hide resolved
src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala
Outdated
Show resolved
Hide resolved
src/main/scala/com/microsoft/hyperspace/index/IndexCollectionManager.scala
Outdated
Show resolved
Hide resolved
src/main/scala/com/microsoft/hyperspace/index/sources/delta/DeltaLakeRelation.scala
Outdated
Show resolved
Hide resolved
src/main/scala/com/microsoft/hyperspace/index/sources/delta/DeltaLakeRelation.scala
Outdated
Show resolved
Hide resolved
src/main/scala/com/microsoft/hyperspace/index/sources/delta/DeltaLakeRelation.scala
Show resolved
Hide resolved
src/main/scala/com/microsoft/hyperspace/index/sources/delta/DeltaLakeRelation.scala
Outdated
Show resolved
Hide resolved
imback82
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Almost there
src/main/scala/com/microsoft/hyperspace/actions/OptimizeAction.scala
Outdated
Show resolved
Hide resolved
src/main/scala/com/microsoft/hyperspace/actions/RefreshActionBase.scala
Outdated
Show resolved
Hide resolved
src/main/scala/com/microsoft/hyperspace/index/IndexManager.scala
Outdated
Show resolved
Hide resolved
| * @param logVersion Index log version to retrieve. | ||
| * @return IndexLogEntry if the index of the given log version exists, otherwise None. | ||
| */ | ||
| def getIndexLogEntry( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have def getIndexes(states: Seq[String] = Seq()): Seq[IndexLogEntry]. Should we make the naming consistent? Also, we should expose an API to return log versions: #272 (comment).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you want, you can create a separate PR that just updates IndexManager APIs, but I am also fine if you do it in this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will do this with a follow up PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds good!
src/main/scala/com/microsoft/hyperspace/index/sources/interfaces.scala
Outdated
Show resolved
Hide resolved
src/test/scala/com/microsoft/hyperspace/index/DeltaLakeIntegrationTest.scala
Outdated
Show resolved
Hide resolved
src/test/scala/com/microsoft/hyperspace/index/DeltaLakeIntegrationTest.scala
Outdated
Show resolved
Hide resolved
src/test/scala/com/microsoft/hyperspace/index/DeltaLakeIntegrationTest.scala
Outdated
Show resolved
Hide resolved
| // Get the timestamp string for time travel query using "timestampAsOf" option. | ||
| val timestampStr = getSparkFormattedTimestamps(System.currentTimeMillis).head | ||
| // Sleep 1 second because the unit of commit timestamp is second. | ||
| Thread.sleep(1000) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added this because delta lake internally records the time in second unit. So I forces to record different times for each version update, so that I could test the query with timestamp properly.
Maybe add this comment to the code? Reading the existing comment, I didn't know why we had to sleep.
661f634 to
04b3b53
Compare
src/main/scala/com/microsoft/hyperspace/index/IndexManager.scala
Outdated
Show resolved
Hide resolved
src/main/scala/com/microsoft/hyperspace/index/sources/delta/DeltaLakeRelation.scala
Outdated
Show resolved
Hide resolved
src/main/scala/com/microsoft/hyperspace/index/sources/delta/DeltaLakeRelation.scala
Outdated
Show resolved
Hide resolved
src/main/scala/com/microsoft/hyperspace/index/sources/delta/DeltaLakeRelation.scala
Outdated
Show resolved
Hide resolved
src/test/scala/com/microsoft/hyperspace/index/DeltaLakeIntegrationTest.scala
Show resolved
Hide resolved
| * @param logVersion Index log version to retrieve. | ||
| * @return IndexLogEntry if the index of the given log version exists, otherwise None. | ||
| */ | ||
| def getIndexLogEntry( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds good!
imback82
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM (few minor comments), thanks @sezruby!
| .getOrElse(DeltaLakeConstants.DELTA_VERSION_HISTORY_PROPERTY, "") | ||
|
|
||
| // The value is comma separated versions - <index log version>:<delta table version>. | ||
| // e.g. "1:2,3:5,5:9" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Versions are processed in a reverse order to keep the higher index log version in case different index
// log versions refer to the same delta lake version.
// For example, "1:1, 2:2, 3:2" will become Seq((1, 1), (3, 2))., if I understand this correctly?
What is the context for this pull request?
Fixes #270
`
What changes were proposed in this pull request?
This PR improves Hybrid Scan for a time travel query of Delta Lake table by using an old version of index which is closest to the given time travel delta version.
This PR includes:
closestIndexVersionAPI in source providerDoes this PR introduce any user-facing change?
Yes, an old version of index can be applied for a time travel query on Delta Lake.
How was this patch tested?
Unit tests