Skip to content

Conversation

@Fokko
Copy link
Contributor

@Fokko Fokko commented Dec 20, 2020

Exploring how to write Iceberg tables using Beam.

By removing the PDone, and emit the files that are written, we can add these to the Table Format, such as Iceberg, Hudi or Delta.

Exploring the possible approaches, but I think this is a very fruitful direction since we can fully re-use the existing Avro/Parquet/.. writers, and emit the WriteFilesResult to the next operator, which will append the new files to the Iceberg log.

My suggestion would be to change the API in Apache Beam so we can use the WriteFilesResult. Add the Iceberg extension in the Iceberg repository itself. This will be a PTransform<WriteFilesResult<?>, PDone>. Maybe I'll change the PDone to a Table (Iceberg Table) as well. So you can signal downstream consumers. I'll open this PR somewhere next week, but would like to know your idea's as well! :)


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

Post-Commit Tests Status (on master branch)

Lang SDK Dataflow Flink Samza Spark Twister2
Go Build Status --- Build Status --- Build Status ---
Java Build Status Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status
Build Status
Build Status
Build Status
Python Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
--- Build Status ---
XLang Build Status Build Status Build Status --- Build Status ---

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website Whitespace Typescript
Non-portable Build Status Build Status
Build Status
Build Status
Build Status
Build Status Build Status Build Status Build Status
Portable --- Build Status --- --- --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests

See CI.md for more information about GitHub Actions CI.

Exploring how to write Iceberg tables using Beam.

By removing the PDone, and emit the files that are
written, we can add these to the Table Format,
such as Iceberg.

Exploring the possible approaches, but I think
this is a very fruitful direction since we can fully
re-use the existing Avro/Parquet/.. writers, and emit
the WriteFilesResult to the next operator, which
will append the new files to the Iceberg log.
@Fokko
Copy link
Contributor Author

Fokko commented Dec 21, 2020

R: @iemejia 👍

@ghost
Copy link

ghost commented Dec 21, 2020

Run Spark StructuredStreaming ValidatesRunner

@iemejia
Copy link
Member

iemejia commented Dec 21, 2020

@tszerszen Spark Structured Streaming Validates Runner tests are failing so I think we can ignore those for the moment.

Copy link
Member

@iemejia iemejia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HelIo Fokko!
Great to see you contributing to Beam and super excited to have Iceberg support soon. Don't hesitate to share more details (or early WIP PR).

I let one question, do you think you can get ahead without changing AvroIO.Write if so just revert that part of the changes and I will merge.

public PDone expand(PCollection<T> input) {
input.apply(inner);
return PDone.in(input.getPipeline());
public WriteFilesResult<?> expand(PCollection<T> input) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This solution looks ok, but I am wary of the consequences of changing the return type. It looks like we introduced TypedWrite on Beam for the achieve the same goal without breaking backwards compatibility.

The 'modern' preferred way to write files on Beam is via FileIO.write() which already returns WriteFilesResult.
Can you check if you we can achieve the intended results by relying on FileIO.write() + AvroIO.sink() ? or is there anything missing?

CC: @jkff in case you have some extra detail to add.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, this should use FileIO.write + AvroIO.sink - the current change is incompatible and will break anybody's transforms of the form:

PDone expand(...) {
  ...
  return AvroIO.write()...;  // If return type changes to WFR, this stops compiling
}


@Override
public PTransform<PCollection<Row>, POutput> buildWriter() {
public PTransform<PCollection<Row>, WriteFilesResult<?>> buildWriter() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good, here the change has less issues since this class is @Internal
CC @TheNeuralBit for awarenes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack, thank you

* A {@link PTransform} that writes to a {@link FileBasedSink}. A write begins with a sequential
* global initialization of a sink, followed by a parallel write, and ends with a sequential
* finalization of the write. The output of a write is {@link PDone}.
* finalization of the write. The output of a write is {@link WriteFilesResult} with the files
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@Fokko
Copy link
Contributor Author

Fokko commented Dec 21, 2020

Thanks for the pointers. Turns out that this wasn't needed. I'm able to wire everything together using the FileIO API. First attempt is in apache/iceberg#1972

@Fokko Fokko closed this Dec 21, 2020
@iemejia
Copy link
Member

iemejia commented Dec 22, 2020

Thanks Fokko I will follow track on the Iceberg side, do not hesitate to ping me if anything needed!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants