-
Notifications
You must be signed in to change notification settings - Fork 116
Support Delta Lake file-based source provider #265
Conversation
|
Btw, can you check the build failure? |
7bd4ee1 to
e043245
Compare
|
#227 has been merged. Thanks! |
06b3034 to
e12ef65
Compare
| .format(latestRelation.fileFormat) | ||
| .options(latestRelation.options) | ||
| .load(latestRelation.rootPaths: _*) | ||
| if (latestRelation.rootPaths.size == 1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Delta Lake only allows one path in load()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, we can do something like:
val df = spark.read
.schema(dataSchema)
.format(latestRelation.fileFormat)
.options(latestRelation.options)
// Due to the difference in how the "path" option is set: https://github.com/apache/spark/blob/ef1441b56c5cab02335d8d2e4ff95cf7e9c9b9ca/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala#L197,
// load() with a single parameter needs to be handled differently.
if (latestRelation.rootPaths.size == 1) {
df.load(latestRelation.rootPaths.head)
} else {
df.load(latestRelation.rootPaths: _*)
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Btw, what happens if the delta lake implementation adds the "path" option in latestRelation.options?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://github.com/microsoft/hyperspace/pull/265/files#diff-f7ecc68d1799e9c2c916973786081d5f35f312785537ccd2eed61bce41a10786R72
"path" key will be removed in createRelation.
| import spark.implicits._ | ||
| val dataPathColumn = "_data_path" | ||
| val lineageDF = fileIdTracker.getFileToIdMap.toSeq | ||
| val isDeltaLakeSource = DeltaLakeRuleUtils.isDeltaLakeSource(df.queryExecution.optimizedPlan) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we move this to source provider somehow? Basically, we don't want source specific implementation outside the provider.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question, why does delta not require the replace?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
input_file_neme() function of Delta returns "file:/" not "file:///" - and I think we need to add some assert for this - join result will be empty if they don't match.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. How about we normalize the output of input_file_name by removing "file:/" or "file:///"? (and same to the getFileToIdMap keys)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
replacing filename - id map is cheaper than normalizing all file paths of all rows. I'll add an API for this.
src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala
Outdated
Show resolved
Hide resolved
src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala
Outdated
Show resolved
Hide resolved
| import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, In, Literal, Not} | ||
| import org.apache.spark.sql.catalyst.optimizer.OptimizeIn | ||
| import org.apache.spark.sql.catalyst.plans.logical._ | ||
| import org.apache.spark.sql.delta.files.TahoeLogFileIndex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
--
imback82
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally approach/integration looks good! I will do a detailed review this week.
@pirz @apoorvedave1 Could you take a look as well?
|
LGTM, thanks @sezruby |
| } | ||
| } | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note to reviewers: more hybrid scan + delta lake test cases - #274
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the result comparison tested in #274?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes checkAnswer is used to compare the results in #274
pirz
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, Thanks @sezruby
| useBucketSpec: Boolean): LogicalPlan = { | ||
| val isParquetSourceFormat = index.relations.head.fileFormat.equals("parquet") | ||
| val fileFormat = index.relations.head.fileFormat | ||
| val isParquetSourceFormat = fileFormat.equals("parquet") || fileFormat.equals("delta") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, does this mean adding a new source provider is not enough?
Can we introduce hasParquetAsSourceFormat to the provider API and record this info in the metadata?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can do this as a separate PR if that is preferred. Please create an issue in that case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this case is too specific to create an API to source provider; and it refers fileformat string in index metadata, not relation.
So it's better to create the function in IndexLogEntry or some Utils class if needed. WDYT? @imback82
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should be able to plug in a source provider defined externally without changing Hyperspace codebase. For example, let's say I have a source format "blah" that uses parquet internally, and how can I plug in without modifying Hyperspace? One easy way to think about is whether you can implement delta source provider outside Hyperspace.
I think this case is too specific to create an API to source provider; and it refers fileformat string in index metadata, not relation.
You can do this in the create path and record it in the metadata.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got your point - it should be able to add a new source provider externally.
Let me handle this with a new pr & issue. Thanks!
src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedSource.scala
Outdated
Show resolved
Hide resolved
src/main/scala/com/microsoft/hyperspace/index/sources/delta/DeltaLakeFileBasedSource.scala
Outdated
Show resolved
Hide resolved
src/main/scala/com/microsoft/hyperspace/index/sources/delta/DeltaLakeFileBasedSource.scala
Outdated
Show resolved
Hide resolved
src/main/scala/com/microsoft/hyperspace/index/sources/delta/DeltaLakeFileBasedSource.scala
Outdated
Show resolved
Hide resolved
src/test/scala/com/microsoft/hyperspace/index/DeltaLakeIntegrationTest.scala
Outdated
Show resolved
Hide resolved
src/test/scala/com/microsoft/hyperspace/index/DeltaLakeIntegrationTest.scala
Outdated
Show resolved
Hide resolved
| } | ||
| } | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the result comparison tested in #274?
imback82
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM except for one pending comment about introducing hasParquetAsSourceFormat.
What is the context for this pull request?
What changes were proposed in this pull request?
Refactor #197 + #224 (except for hybrid scan test refactoring) based on #227.
This PR introduces DeltaLakeFileBasedSource to support indexes on Delta Lake Sources.
Does this PR introduce any user-facing change?
Yes,
Users can create/refresh indexes on Delta Lake table and also utilize Hybrid Scan.
How was this patch tested?
Unit test