Skip to content
This repository was archived by the owner on Jun 14, 2024. It is now read-only.

Conversation

@andrei-ionescu
Copy link
Contributor

What is the context for this pull request?

What changes were proposed in this pull request?

This PR adds support for Iceberg.

The following changes are in this PR and each of them are separate commits:

Does this PR introduce any user-facing change?

No. The main changes to user-facing APIs are in the #321 PR. Detailed information can be found in the #318 proposal.

How was this patch tested?

  1. Integration test added for the new functionality
  2. Locally & Databricks Runtime tests
  • Local build
sbt publishLocal
  • Run Spark shell with Hyperspace and Iceberg libraries loaded
$ spark-shell \
--driver-memory 4g \
--packages "com.microsoft.hyperspace:hyperspace-core_2.11:0.4.0-SNAPSHOT,org.apache.iceberg:iceberg-spark-runtime:0.10.0" \
--driver-java-options "-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5006 -XX:+UseG1GC -Dlog4j.debug=true"
  • Paste the following code
import org.apache.spark.sql._
import com.microsoft.hyperspace._
import com.microsoft.hyperspace.index._
import scala.collection.JavaConverters._
import org.apache.iceberg.PartitionSpec
import org.apache.iceberg.TableProperties
import org.apache.iceberg.spark._
import org.apache.iceberg.hadoop._

val hs = new Hyperspace(spark)

// create Iceberg table
val props = Map(TableProperties.WRITE_NEW_DATA_LOCATION -> "table3").asJava
val sourceDf = Seq((1, "name1"), (2, "name2")).toDF("id", "name")
val schema = SparkSchemaUtil.convert(sourceDf.schema)
val part = PartitionSpec.builderFor(schema).build()
val icebergTable = new HadoopTables().create(schema, part, props, "table3")
sourceDf.write.mode("overwrite").format("iceberg").save("./table3")

// read created table
val iceDf = spark.read.format("iceberg").load("./table3")

// create indexes
hs.createIndex(iceDf, IndexConfig("index_ice0", indexedColumns = Seq("id"), includedColumns = Seq("name")))
hs.createIndex(iceDf, IndexConfig("index_ice1", indexedColumns = Seq("name")))

// verify plans
val query = iceDf.filter(iceDf("id") === 1).select("name")
hs.explain(query, verbose = true)

@andrei-ionescu
Copy link
Contributor Author

andrei-ionescu commented Feb 15, 2021

@imback82 I closed the #321, #320 PRs which were obsolete after your refactoring work on #355 - thanks for it! Please review this PR that adds support for Iceberg table format.

@imback82 imback82 added the enhancement New feature or request label Feb 16, 2021
@imback82 imback82 added this to the February 2021 (v0.5.0) milestone Feb 16, 2021
Copy link
Contributor

@imback82 imback82 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Few minor/nit comments, but generally looking good to me.

I don't know every detail of Iceberg, but since the changes are self-contained, I think this is good to go. @sezruby could you also take a look? Thanks.

Copy link
Collaborator

@sezruby sezruby left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally looks good to me! Could you also consider adding HybridScanForIcebergTest like HybridScanForDeltaLakeTest?

@andrei-ionescu
Copy link
Contributor Author

@sezruby The test for hybrid scan is in the IcebergIntegrationTest lines 299-338.

Copy link
Contributor

@imback82 imback82 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM (pending minor/nit comments + @sezruby's comments), thanks @andrei-ionescu!

@sezruby
Copy link
Collaborator

sezruby commented Feb 17, 2021

@sezruby The test for hybrid scan is in the IcebergIntegrationTest lines 299-338.

Yes, but we need to verify the exact plan transformation and the result of query.

@andrei-ionescu
Copy link
Contributor Author

@sezruby I did take the Delta Lake hybrid test and modified it for Iceberg. That is what is in the integration test from line 299 to 338. Is there other hybrid test for Delta Lake that I missed?

@sezruby
Copy link
Collaborator

sezruby commented Feb 17, 2021

@andrei-ionescu andrei-ionescu force-pushed the iceberg_support branch 3 times, most recently from 5a23a11 to 5e89eee Compare February 19, 2021 20:24
@andrei-ionescu
Copy link
Contributor Author

@sezruby I added HybridScanForIcebergTest.scala. Please have another look.

Copy link
Collaborator

@sezruby sezruby left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andrei-ionescu Thanks! Seems it works as expected! 👍👍
For start & end snapshot id test, I'd just like to check the use case & how those configs work well (or not well) with Hyperspace.

@andrei-ionescu andrei-ionescu force-pushed the iceberg_support branch 2 times, most recently from 12347df to a158175 Compare February 19, 2021 21:22
@andrei-ionescu
Copy link
Contributor Author

@sezruby

For start & end snapshot id test, I'd just like to check the use case & how those configs work well (or not well) with Hyperspace.

Is there such check for Delta so that I could inspire from?

@sezruby
Copy link
Collaborator

sezruby commented Feb 19, 2021

@andrei-ionescu No, not sure Delta has the feature.
How about this?

  • build test data

    • snapshot 1) add a row with value 1 (append 1 file)
    • snapshot 2) add a row with value 2 (append 1 file)
    • snapshot 3) add a row with value 3 (append 1 file)
    • snapshot 4) add a row with value 4 (append 1 file)
    • snapshot 5) add a row with value 5 (append 1 file)
  • build index with df with start-snapshot-id 2, end-snapshot-id 4

    • query with df with start-snapshot-id 2, end-snapshot-id 4 w/o hybrid scan
    • query with df with start-snapshot-id 3, and check w/ hybrid scan
    • query with df with end-snapshot-id 3, and check w/ hybrid scan

@andrei-ionescu
Copy link
Contributor Author

andrei-ionescu commented Feb 19, 2021

@sezruby I'll try to see it is possible but from my knowledge on Iceberg I don't think this kind of test is possible. The snapshot ids are used for time travel and to isolate data at read time. A snapshot is a sort of full image of the dataset's metadata -- is not a delta from the previous version. This is a bit different than Delta.

The parameters are extracted from the IcebergSource that knows how to properly access the Iceberg table by getting the latest version and retrieving the files attached to that version. The scope of those parameters are only to properly get the list of files.

Delta has the same parameter under versionAsOf. If there are tests to validate the functionality of Delta with time travel I'll do my best add it to Iceberg too although the implementation is a bit different.

And in regards to time travel, I don't know that we have any functionality that allows us to link a version of the table to a version of the index yet.

I would suggest to have a separate PR with time travel for Iceberg at the right time.

@sezruby
Copy link
Collaborator

sezruby commented Feb 19, 2021

@andrei-ionescu I guess versionAsOf of Delta Lake is the same as Iceberg only with end-snapshot-id.
But a df with both start-snapshot-id or end-snapshot-id handles "delta" dataset between two snapshots, right?

Hybrid scan will work w/o any link of version info - it utilizes the list of source files from DataFrame.

For time travel query optimization, to pick a proper version of a candidate index (if it has multiple versions from refreshes),
I added version history info to link delta version and index version in #272.

@andrei-ionescu
Copy link
Contributor Author

andrei-ionescu commented Feb 20, 2021

@sezruby: I refactored it a bit and it is no longer needed to have knowledge of start-snapshot-id, end-snapshot-id and other internal Iceberg properties. That task method was in part inspired by Iceberg code, but I decided to use the higher level Iceberg API newScan().planFiles() for getting all files.

@sezruby
Copy link
Collaborator

sezruby commented Feb 20, 2021

LGTM thanks @andrei-ionescu!

@andrei-ionescu
Copy link
Contributor Author

@rapoth, @imback82, @sezruby: The PR has 2 LGTMs already - thanks @imback82 & @sezruby. What's the process? What next steps are ahead to have it merged and complete the feature?

@imback82
Copy link
Contributor

Let me take a final look since there were some changes since my last LGTM.

*
* File paths should be the same format as "input_file_name()" of the given relation type.
* For [[IcebergRelation]], each file path should be in this format:
* `file:/path/to/file`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you update this based on the implementation below?

@imback82 imback82 merged commit 29ebdde into microsoft:master Feb 22, 2021
@imback82
Copy link
Contributor

@andrei-ionescu Can you do a follow up PR to address #358 (comment)? Thanks!

@andrei-ionescu
Copy link
Contributor Author

@imback82: I create #362 to address the doc comment.

@imback82, @sezruby, @rapoth: Thanks for help with this PR & feature.

@rapoth
Copy link
Contributor

rapoth commented Feb 22, 2021

It has been a fantastic collaboration so far and I have personally learned a lot in the process! Thank you @andrei-ionescu! 🙂

@sezruby
Copy link
Collaborator

sezruby commented Feb 23, 2021

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[PROPOSAL]: Support Iceberg table format [FEATURE REQUEST]: Add support for Iceberg table format

4 participants