-
Notifications
You must be signed in to change notification settings - Fork 4.5k
Fix UpdateSchemaDestination breaking DynamicDestination in Bigquery BatchLoad #25410
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
The current UpdateSchemaDestination implementation in Java does not consider the dynamic destination. It simply takes the first seen destination. This causes problems in many ways. The integration test supposed to test this class did not work either. It should set |
| for (KV<DestinationT, WriteTables.Result> entry : element) { | ||
| destination = entry.getKey(); | ||
| if (destination != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I neglected the handling of null destination in the original implementation. I do not see what kind of scenario would have null destination (unless user provided DynamicDestination.getDestination returns a null, but the documentation says it may not return null). Nevertheless, the upstream WriteTempTables essentially has this processElement calling dynamicDestinations.getTable(destination) for all incoming elements:
Line 221 in 78c1564
| TableDestination tableDestination = dynamicDestinations.getTable(destination); |
and asking it return a nonNull tableDestination. (same call used in UpdateSchemaDestination). So make the behavior consistent to WriteTables here
…atchLoad * Handle dynamic table destination in UpdateSchemaDestination impl * Add ZERO_LOAD job type for schema update load * Fix BigQuerySchemaUpdateOptionsIT to actually test temp tableis scenario Rewrite BigQuerySchemaUpdateOptionsIT.runWriteTestTempTable to test dynamicDestination scenario
|
Assigning reviewers. If you would like to opt out of this review, comment R: @kennknowles for label java. Available commands:
The PR bot will only process comments in the main thread (not review comments). |
|
Assigning reviewers. If you would like to opt out of this review, comment R: @kennknowles for label java. Available commands:
The PR bot will only process comments in the main thread (not review comments). |
ahmedabu98
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot for this fix! This unblocks a popular use case of large writes to dynamic destinations. Left a few comments, mainly I think that seeing ZERO_LOAD in logs will be confusing to people who don't have context around this. Maybe a more clear name will help. Besides that this PR looks great
...cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryResourceNaming.java
Outdated
Show resolved
Hide resolved
| Lists.newArrayList(zeroLoadJobIdPrefixView); | ||
| sideInputsForUpdateSchema.addAll(dynamicDestinations.getSideInputs()); | ||
|
|
||
| PCollection<TableDestination> successfulMultiPartitionWrites = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense to have a GBK after writeTempTables? Re your comment on the issue we'd end up with a PCollection<KV<DestinationT, Iterable<WriteTables.Result>>>. Also would protect writeTempTables against retry if UpdateSchemaDestination fails
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see this would need some changes in WriteRename as well, perhaps this could be an improvement in a separate PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
after leaving that comment I find using a gbk may break the use case when the final destination table gets updated more than once, e.g. in streaming file load. Of course careful windowing we can avoid it, however I did not look deep into it at this moment. So far I leave the overall structure of the pipeline unchanged.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
using a gbk may break the use case when the final destination table gets updated more than once, e.g. in streaming file load
Ideally, this check would prevent that if it worked as intended:
Line 263 in feb248a
| || destinationTable.getSchema().equals(schema) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah agree, ideally. Just not confident enough and want to keep the change limited to what necessary to fix bug (though change is already not minor)
ahmedabu98
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM if tests pass
|
Thanks, https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Java17_Commit/5611/ passed though not updated on GitHub UI |
Fixes #25355
Handle dynamic table destinations in UpdateSchemaDestination impl
Add ZERO_LOAD job type for schema update load
Fix BigQuerySchemaUpdateOptionsIT to actually test temp tableis scenario
Rewrite BigQuerySchemaUpdateOptionsIT.runWriteTestTempTable to test dynamicDestination scenario
Please add a meaningful description for your change here
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>instead.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI.