-
Notifications
You must be signed in to change notification settings - Fork 116
Introduce SourceRelation/FileBasedRelation traits to remove direct dependency on LogicalRelation from actions/rules #355
Conversation
|
@imback82 Just some early feedback... I've seen you added the |
|
We need to have files-related APIs such as Similar to |
|
First, regarding Iceberg and DataSourceV2... The Iceberg source does not use and index (like the Second, it does NOT look right to have the In the end, if you still consider that this will bring clarity, please complete this PR ASAP and merge it, so that I could modify my PRs to fit these changes. Thank you. |
Why not? I see that you are implementing I will get this ready for review by eod today as promised. |
| } | ||
| } | ||
|
|
||
| protected def sourceRelations(spark: SparkSession, df: DataFrame): Seq[Relation] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is no longer needed as creating Relation has moved to FileBasedRelation.
| (indexDF, resolvedIndexedColumns, resolvedIncludedColumns) | ||
| } | ||
|
|
||
| private def getPartitionColumns(df: DataFrame): Seq[String] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is no longer needed as FileBasedRelation supports getting the partition schema.
| Hyperspace | ||
| .getContext(spark) | ||
| .sourceProviderManager | ||
| .allFiles(relation) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
allFiles has moved to FileBasedRelation.
| RuleUtils.getLogicalRelation(l).isDefined && RuleUtils.getLogicalRelation(r).isDefined && | ||
| isPlanLinear(l) && isPlanLinear(r) && !isPlanModified(l) && !isPlanModified(r) && | ||
| ensureAttributeRequirements(l, r, condition) | ||
| isPlanLinear(l) && isPlanLinear(r) && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@apoorvedave1 we don't need isPlanLinear if getRelation implementation is as follows, right (collecting leaves and making sure it's only one)?:
def getRelation(spark: SparkSession, plan: LogicalPlan): Option[FileBasedRelation] = {
val provider = Hyperspace.getContext(spark).sourceProviderManager
val leaves = plan.collectLeaves()
if (leaves.size == 1 && provider.isSupportedRelation(leaves.head)) {
Some(provider.getRelation(leaves.head))
} else {
None
}
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that makes sense, we can eliminate isPlanLinear check
| * @param plan Logical plan. | ||
| * @return true if the relation in the plan is modified by Hyperspace. | ||
| */ | ||
| private def isPlanModified(plan: LogicalPlan): Boolean = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed because this is directly testes in isApplicable above.
| Hyperspace | ||
| .getContext(spark) | ||
| .sourceProviderManager | ||
| .allFiles(rel) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
again, allFiles has moved to FileBasedRelation (see below).
| .partitionBasePath(location) | ||
| case l: LeafNode if provider.isSupportedRelation(l) => | ||
| val relation = provider.getRelation(l) | ||
| val options = relation.partitionBasePath |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sezruby Question. Below line 482, do we need to create a tag on the originalPlan or can it be just the logical plan for the relation?:
val newLocation = index.withCachedTag(
originalPlan,
IndexLogEntryTags.INMEMORYFILEINDEX_HYBRID_SCAN_APPENDED) {
new InMemoryFileIndex(spark, filesAppended, options, None)
}getCandidateIndexes uses the relation plan, so I was curious.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea getCandidateIndexes using a relation plan, but for here I used originalPlan.
It might be better to use the relation plan here as it's more reusable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks, we can do that as a follow up (wanted to minimize the behavior change in this PR)
| */ | ||
| def refreshRelation(relation: Relation): Relation = { | ||
| run(p => p.refreshRelation(relation)) | ||
| def refreshRelationMetadata(relation: Relation): Relation = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have so many "Relation"s, so I am explicitly calling it out as relation metadata here now.
| /** | ||
| * Implementation for file-based relation used by [[DefaultFileBasedSource]] | ||
| */ | ||
| class DefaultFileBasedRelation(spark: SparkSession, override val plan: LogicalRelation) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most of the implementation is from DefaultFileBasedSource with minor changes to change the return type: we don't need to wrap with Option any longer.
| formats.toLowerCase(Locale.ROOT).split(",").map(_.trim).toSet | ||
| }) | ||
|
|
||
| /** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed code has moved to DefaultFileBasedRelation.
|
@andrei-ionescu @sezruby @apoorvedave1 This is ready for review now (I will update the PR description soon, but I believe you already know the context. 😄 ). Basically, I removed the direct dependency on @andrei-ionescu Now, you can just implement |
This reverts commit 22f017a.
we worked on this together Co-Authored-By: Andrei Ionescu <webdev.andrei@gmail.com>
|
@andrei-ionescu I also add you as a co-author in this commit 50ed5cd, so when this PR is merged, you will get the co-authorship. Thanks! |
|
@imback82 Thanks for co-authoring! |
|
The approach looks great. Thanks :) |
apoorvedave1
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, 👍 thanks @imback82
imback82
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @andrei-ionescu @sezruby @apoorvedave1 for the review!
I will create a follow up PR to address a couple of comments.
What is the context for this pull request?
#321 and #320 require to support
DatasourceV2Relation, but the current provider was designed to support only theLogicalRelation, so adding a new provider that supports a different relation type requires lots of code changes across actions/rules.What changes were proposed in this pull request?
This proposes to introduce one more abstraction to
FileBasedSourceProvidersuch that each provider now needs to implementFileBasedRelation. For example,DeltaLakeFileBasedSourceneeds to implementDeltaRelationwhich extendsFileBasedRelationto handle a Delta Lake specific relation.By decoupling this, actions/rules do not depend on
LogicalRelationdirectly.Does this PR introduce any user-facing change?
No
How was this patch tested?
API refactoring. Existing tests should be enough.