Skip to content
This repository was archived by the owner on Jun 14, 2024. It is now read-only.

Conversation

@sezruby
Copy link
Collaborator

@sezruby sezruby commented Oct 21, 2020

What is the context for this pull request?

Please just check the last commit to check the diff as the base is not yet merged - #197

What changes were proposed in this pull request?

This PR enables to do Hybrid Scan with Delta Lake relation.
A new util function def retrieveCurFileInfos(plan: LogicalPlan) is defined and it returns the current file list in a given logical plan regardless of the type of the relation.

We can optimize getCandidateIndex for Hybrid Scan by using the version info and path of Delta Lake later.

Does this PR introduce any user-facing change?

Yes, a user can utilize Hybrid Scan with Delta Lake relation.

How was this patch tested?

Unit test

@sezruby sezruby mentioned this pull request Oct 21, 2020
4 tasks
@sezruby sezruby changed the title Support Hybrid Scan for Delta Lake Support Hybrid Scan with Delta Lake relation Oct 21, 2020
@sezruby sezruby self-assigned this Oct 21, 2020
@sezruby sezruby added the enhancement New feature or request label Oct 21, 2020
@sezruby sezruby added this to the November 2020 milestone Oct 21, 2020
@sezruby sezruby marked this pull request as draft October 22, 2020 15:22
@rapoth rapoth modified the milestones: October 2020, November 2020 Oct 29, 2020
@sezruby sezruby marked this pull request as ready for review November 9, 2020 11:56
@sezruby sezruby force-pushed the deltamulti branch 2 times, most recently from dbb04f7 to fa84c7e Compare November 9, 2020 12:05
import spark.implicits._
val dfFromSample = sampleData.toDF("Date", "RGUID", "Query", "imprs", "clicks")
hyperspace = new Hyperspace(spark)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: remove pre-construction of indexes and create data & index in each test.

basePlan
.equals(filter.queryExecution.optimizedPlan)
.equals(!transformationExpected))
withTempPathAsString { testPath =>
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: just added this "withTempPathAsString" for this test

}
}

test("Delete-only: filter rule, number of delete files threshold") {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: moved from HybridScanTestSuite as no need to run this test for different source file formats.

hyperspace = new Hyperspace(spark)
}

FileUtils.delete(new Path(sampleDataLocationRoot))
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: removed pre-built indexes

indexConfig: IndexConfig,
appendCnt: Int,
deleteCnt: Int): (Seq[String], Seq[String]) = {
dfFromSample.write.partitionBy("clicks", "Date").format(sourceFileFormat).save(sourcePath)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: as this function needs to create source data accordingly, added (override) function for partitioned data.

Comment on lines +70 to +71
// Exclude "versionAsOf" and "timestampAsOf" options, so that this DataFrame for refresh
// can see the latest snapshot.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it mean that user cannot create index on a previous version?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, but this is the refresh action so that we should remove the versionAsOf or timestampAsOf option to refresh based on the latest version.
If a user would like to refresh with a specific version, not the latest version, then we might need to extend the refresh API.

// it will be matched again and transformed recursively which causes stack overflow exception.
case baseRelation @ LogicalRelation(
_ @HadoopFsRelation(location: PartitioningAwareFileIndex, _, _, _, _, _),
_ @HadoopFsRelation(baseLocation: FileIndex, _, _, _, _, _),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you restrict it to PartitionAwareFileIndex or TahoeFileIndex. There are other FileIndex implementations which we don't support / haven't investigated fully and we don't know what the behavior could be I guess.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case, an exception will be thrown with pluggable source provider impl.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it. Could you please add the comment in the code then? it's otherwise difficult to guess this part for reviewers

Some(basePath.toString)
case d: TahoeLogFileIndex =>
Some(d.path.toString)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

match error for non-PAFI and non-TLFI

}

def retrieveCurFileInfos(plan: LogicalPlan): Seq[FileInfo] = {
val ret = plan.collect {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename ret => fileInfos?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or groupedFileInfos or something?

@sezruby
Copy link
Collaborator Author

sezruby commented Nov 25, 2020

This PR will be delivered by #265.

@sezruby
Copy link
Collaborator Author

sezruby commented Dec 9, 2020

Closed by #265

@sezruby sezruby closed this Dec 9, 2020
@sezruby sezruby deleted the deltamulti branch January 14, 2021 08:27
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants