Skip to content

Conversation

@chunyang
Copy link
Contributor

@chunyang chunyang commented Feb 27, 2021

The problem: Schema update options are not respected when using temporary tables to load data into BigQuery (e.g., when the number of files to load necessitate using multiple load jobs). Unlike when using query or load jobs, BigQuery does not allow field addition or relaxation when using copy jobs to append data to an existing table.

The solution: Before starting the copy jobs to move data from temporary tables to the final destination table, run zero row load jobs against the destination table(s) using the temporary table schemas and user provided schema update options. These zero row load jobs will update (if needed) the schema of the destination table(s) to accept the data from the forthcoming copy jobs.

If no schema update options are configured, then no zero-row load jobs will be run--UpdateDestinationSchema and WaitForSchemaModJobs become no-ops.

Summary of changes:


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

Post-Commit Tests Status (on master branch)

Lang SDK Dataflow Flink Samza Spark Twister2
Go Build Status --- Build Status --- Build Status ---
Java Build Status Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status
Build Status
Build Status
Build Status
Python Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
--- Build Status ---
XLang Build Status Build Status Build Status --- Build Status ---

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website Whitespace Typescript
Non-portable Build Status Build Status
Build Status
Build Status
Build Status
Build Status Build Status Build Status Build Status
Portable --- Build Status --- --- --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests

See CI.md for more information about GitHub Actions CI.

When using temporary tables to append data to an existing table, first
update the schema of the destination table if schema field addition
or relaxation are allowed in schemaUpdateOptions. This needs to be done
as a separate step because BQ copy jobs do not support schema update
when appending to an existing table.

WIP because empty files list does not work when submitting load jobs.
Pointing to an empty file in GCS does work but that means this empty
file needs to be created.
@codecov
Copy link

codecov bot commented Feb 27, 2021

Codecov Report

Merging #14113 (b9ff38e) into master (40eef35) will decrease coverage by 0.01%.
The diff coverage is n/a.

Impacted file tree graph

@@            Coverage Diff             @@
##           master   #14113      +/-   ##
==========================================
- Coverage   83.39%   83.38%   -0.02%     
==========================================
  Files         469      469              
  Lines       58711    58783      +72     
==========================================
+ Hits        48962    49016      +54     
- Misses       9749     9767      +18     
Impacted Files Coverage Δ
...amples/snippets/transforms/aggregation/__init__.py
...dks/python/apache_beam/examples/wordcount_xlang.py
.../sdks/python/apache_beam/options/value_provider.py
.../examples/snippets/transforms/elementwise/regex.py
...ers/interactive/testing/integration/screen_diff.py
...python/apache_beam/examples/wordcount_xlang_sql.py
...d/srcs/sdks/python/apache_beam/io/filebasedsink.py
...che_beam/portability/api/beam_expansion_api_pb2.py
...he_beam/io/flink/flink_streaming_impulse_source.py
...ython/apache_beam/io/gcp/experimental/spannerio.py
... and 928 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 40eef35...b9ff38e. Read the comment docs.

@chunyang chunyang force-pushed the cyang/copy-schema-relaxation branch from fc1f910 to 9d55947 Compare March 1, 2021 17:00
@chunyang chunyang marked this pull request as ready for review March 1, 2021 17:04
@chunyang chunyang changed the title [BEAM-11277] WIP: Respect schemaUpdateOptions even when using temporary tables [BEAM-11277] Respect schemaUpdateOptions during BigQuery load with temporary tables Mar 1, 2021
@chunyang
Copy link
Contributor Author

chunyang commented Mar 1, 2021

R: @pabloem
R: @angoenka

@chunyang chunyang force-pushed the cyang/copy-schema-relaxation branch from 9d55947 to 9af141d Compare March 1, 2021 17:09
@chunyang chunyang force-pushed the cyang/copy-schema-relaxation branch from 9af141d to 911945c Compare March 1, 2021 21:43
@pabloem
Copy link
Member

pabloem commented Mar 10, 2021

Run Python 3.8 PostCommit

Comment on lines +399 to +401
location=temp_table_load_job_reference.location)
temp_table_schema = temp_table_load_job.configuration.load.schema

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to compare the schema of the destination table with the schema of the temp table job? We'd save one load, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a simple comparison of destination_table.schema == temp_table_schema. It will work for trivial cases but doesn't catch the cases where the order of fields in a record differs. E.g., the following schemas are different according to == even though the temp table can be directly appended to the destination table without error.

<TableSchema
  fields: [
    <TableFieldSchema fields: [], name: 'bytes', type: 'BYTES'>,
    <TableFieldSchema fields: [], name: 'date', type: 'DATE'>,
    <TableFieldSchema fields: [], name: 'time', type: 'TIME'>
  ]>
<TableSchema
  fields: [
    <TableFieldSchema fields: [], name: 'date', type: 'DATE'>,
    <TableFieldSchema fields: [], name: 'time', type: 'TIME'>,
    <TableFieldSchema fields: [], name: 'bytes', type: 'BYTES'>
  ]>

I can probably write a function to check the schema recursively but do you know if one already exists?

apache#14113 (comment)

Reusing one single PCollection concentrates the different paths into
a single stage which complicates firing of triggers for the stage.
..when destination table schema matches temp table schema.
@pabloem
Copy link
Member

pabloem commented Mar 23, 2021

Run Python 3.8 PostCommit

@pabloem
Copy link
Member

pabloem commented Mar 23, 2021

@kmjung @vachan-shetty are you aware of a function in Beam or in BQ APIs that can help us compare two schemas?

@kmjung
Copy link
Contributor

kmjung commented Mar 23, 2021

I'm not aware of anything like this in open source, no, although I will admit to not being very familiar with the Python SDK.

@pabloem pabloem merged commit dcd8778 into apache:master Mar 31, 2021
@pabloem
Copy link
Member

pabloem commented Mar 31, 2021

Merging this for now. @chunyang feel free to add the schema matching functionality, or please create a JIRA issue if you can so we won't forget about it.

Thanks a lot for implementing this. It's great to have it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants