Skip to content

Conversation

@nehsyc
Copy link
Contributor

@nehsyc nehsyc commented Feb 22, 2021

This is a parity of Java changes (#13496 and #13859).

Use GroupIntoBatches.WithShardedKey API to group and batch write before streaming to BigQuery service. Currently batching is done best-effort upon bundle finalization.

This PR adds an option to WriteToBigQuery to toggle between the existing and new implementation for both streaming inserts and file loads.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

Post-Commit Tests Status (on master branch)

Lang SDK Dataflow Flink Samza Spark Twister2
Go Build Status --- Build Status --- Build Status ---
Java Build Status Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status
Build Status
Build Status
Build Status
Python Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
--- Build Status ---
XLang Build Status Build Status Build Status --- Build Status ---

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website Whitespace Typescript
Non-portable Build Status Build Status
Build Status
Build Status
Build Status
Build Status Build Status Build Status Build Status
Portable --- Build Status --- --- --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests

See CI.md for more information about GitHub Actions CI.

@nehsyc nehsyc force-pushed the bq-integration-python branch from bbdb0af to 690790d Compare February 22, 2021 07:21
@codecov
Copy link

codecov bot commented Feb 22, 2021

Codecov Report

Merging #14033 (1e52ae9) into master (aaad864) will increase coverage by 0.00%.
The diff coverage is n/a.

Impacted file tree graph

@@           Coverage Diff           @@
##           master   #14033   +/-   ##
=======================================
  Coverage   82.98%   82.99%           
=======================================
  Files         469      469           
  Lines       58298    58330   +32     
=======================================
+ Hits        48379    48409   +30     
- Misses       9919     9921    +2     
Impacted Files Coverage Δ
sdks/python/apache_beam/portability/__init__.py
.../python/apache_beam/io/gcp/tests/pubsub_matcher.py
sdks/python/apache_beam/error.py
...s/python/apache_beam/examples/snippets/snippets.py
...on/apache_beam/portability/api/metrics_pb2_urns.py
...n/apache_beam/runners/dataflow/dataflow_metrics.py
...ython/apache_beam/io/external/generate_sequence.py
...ython/apache_beam/typehints/decorators_test_py3.py
.../examples/snippets/transforms/elementwise/pardo.py
sdks/python/apache_beam/typehints/decorators.py
... and 925 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update aaad864...1e52ae9. Read the comment docs.

@nehsyc
Copy link
Contributor Author

nehsyc commented Feb 22, 2021

R: @pabloem
CC: @reuvenlax

@pabloem
Copy link
Member

pabloem commented Feb 22, 2021

Run Python PreCommit

Copy link
Member

@pabloem pabloem left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's some lint issues here: https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Commit/8336/console

Specifically

00:02:47.755 apache_beam/io/gcp/bigquery.py:1363:0: C0301: Line too long (102/80) (line-too-long)
00:02:47.755 apache_beam/io/gcp/bigquery.py:1361:0: W0105: String statement has no effect (pointless-string-statement)

I think you can just document those constants with the usual # comment instead of the """" comment """ ones.

Also, I guess you've figured this out, but it will be good if you add an integration test. You can find those mainly in bigquery_test.py, they're tagged with @attr('IT')

as a :class:`~apache_beam.io.gcp.internal.clients.bigquery.\
bigquery_v2_messages.TableSchema`. or a `ValueProvider` that has a JSON string,
or a python dictionary, or the string or dictionary itself,
bigquery_v2_messages.TableSchema`. or a `ValueProvider` that has a JSON
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this may be causing the docs test failure

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the test suite:

00:02:30.217 /home/jenkins/jenkins-slave/workspace/beam_PreCommit_PythonDocs_Commit/src/sdks/python/test-suites/tox/pycommon/build/srcs/sdks/python/apache_beam/io/gcp/bigquery.py:docstring of apache_beam.io.gcp.bigquery.WriteToBigQuery:28: WARNING: py:class reference target not found: apache_beam.io.gcp.internal.clients.bigquery.        bigquery_v2_messages.TableSchema

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing it out! Fixed and pushed a new commit.

@nehsyc nehsyc force-pushed the bq-integration-python branch from 04ef413 to 3526cdb Compare February 22, 2021 22:17
@pabloem
Copy link
Member

pabloem commented Feb 22, 2021

Run Python 3.8 PostCommit

@nehsyc nehsyc force-pushed the bq-integration-python branch from 3526cdb to 3f869ac Compare February 23, 2021 01:35
@nehsyc nehsyc force-pushed the bq-integration-python branch from 3f869ac to d8fe073 Compare February 23, 2021 01:42
@nehsyc nehsyc force-pushed the bq-integration-python branch from 9c4156e to d499460 Compare February 23, 2021 19:00
'triggering_frequency': self.triggering_frequency,
'validate': self._validate,
'temp_file_format': self._temp_file_format,
'ignore_insert_ids': self._ignore_insert_ids,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ignore_insert_ids was previously not added to runner api parameter. Wondering if this is a bug as when we convert from the runner api the parameter will be lost.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah thanks Siyuan!

max_file_size=20,
max_files_per_bundle=-1))

@parameterized.expand(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was not able to run this with Dataflow runner. I can revert it back if more appropriate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reverted the test as I still couldn't run it successfully even with local Dataflow runner. I will follow up in a separate PR if needed.

'triggering_frequency': self.triggering_frequency,
'validate': self._validate,
'temp_file_format': self._temp_file_format,
'ignore_insert_ids': self._ignore_insert_ids,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah thanks Siyuan!

@nehsyc
Copy link
Contributor Author

nehsyc commented Feb 24, 2021

Thanks for the review!

I got the following lint errors which are supposed to be suppressed. Any ideas?

17:47:48 ERROR: /home/jenkins/jenkins-slave/workspace/beam_PreCommit_PythonLint_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/io/gcp/bigquery_test.py Imports are incorrectly sorted.
17:47:48 --- /home/jenkins/jenkins-slave/workspace/beam_PreCommit_PythonLint_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/io/gcp/bigquery_test.py:before	2021-02-23 01:43:30.069624
17:47:48 +++ /home/jenkins/jenkins-slave/workspace/beam_PreCommit_PythonLint_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/io/gcp/bigquery_test.py:after	2021-02-23 01:47:43.161598
17:47:48 @@ -38,6 +38,10 @@
17:47:48  import mock
17:47:48  import pytz
17:47:48  from nose.plugins.attrib import attr
17:47:48 +# Protect against environments where bigquery library is not available.
17:47:48 +# pylint: disable=wrong-import-order, wrong-import-position
17:47:48 +from parameterized import param
17:47:48 +from parameterized import parameterized

@pabloem
Copy link
Member

pabloem commented Feb 24, 2021

I think you can just move lines 80-81 to be 41-42, and that should be enough?

@nehsyc nehsyc force-pushed the bq-integration-python branch from 968f4fd to 1e52ae9 Compare February 24, 2021 06:24
@pabloem
Copy link
Member

pabloem commented Feb 24, 2021

Run Python 3.8 PostCommit

1 similar comment
@pabloem
Copy link
Member

pabloem commented Feb 24, 2021

Run Python 3.8 PostCommit

@pabloem pabloem merged commit 70f2aeb into apache:master Feb 24, 2021
@pabloem
Copy link
Member

pabloem commented Feb 24, 2021

thanks @nehsyc ! this will be in Beam 2.29.0 - excited to set it to the wild : )

@nehsyc nehsyc changed the title [BEAM-11408] Integrate Python BigQuery sink with GroupIntoBatches [BEAM-11408, BEAM-11772] Integrate Python BigQuery sink with GroupIntoBatches Mar 2, 2021
kennknowles added a commit to kennknowles/beam that referenced this pull request Mar 4, 2021
…gger

* github/master: (123 commits)
  [BEAM-11899] Bump commons-pool to 2.8.1 and bump commons-dbcp to 2.8.0, Because there is a library dependency
  Update pillars.yaml (apache#14142)
  [BEAM-10632] Checkerframework nullness cleanups (apache#14107)
  [BEAM-11213] Instantiate SparkListenerApplicationStart in a Spark 3 compatible way
  Fix typos for excluding testMergingCustomWindowsWithoutCustomWindowTypes
  Specify the time resolution for TestStreamPayload.
  [BEAM-10961] Enable strict depdency checking for sdks/java/extensions/euphoria (second attempt)
  [BEAM-11848] Store Docker images in a variable for consistency.
  Splitting old Go Precommit and new ULR integration test precommit.
  Moving runner imports out of ptest.
  Add the TO_STRING capability to Java and Python
  [BEAM-11848] Fix Docker images list.
  jdbc python supported Dataflow runner (apache#13960)
  Adding a warning to use multi-workers on FnApiRunner
  Fix legend for Python Directrunner microbenchmarks
  [BEAM-11740] Estimate PCollection byte size
  [BEAM-10961] enable strict dependency checking for sdks/java/extensions/zetasketch (apache#14093)
  Map Dataflow JOB_STATE_CANCELLING to Beam RUNNING state
  [BEAM-11833] Fix reported watermark after restriction split in UnboundedSourceAsSDFRestrictionTracker
  [BEAM-10761] add reference to BEAM-11761
  [BEAM-10961] enable strict dependency checking for flink/job-server
  Exclude MapState example integration tests from Dataflow runner v2 suite
  Remove InvalidWindows from Java SDK, instead track "already merged" bit
  Fix checkstyle in watermark latency benchmark
  Fix compile breakage in WindmillStateInternals
  Improve test, error on ALREADY_MERGED.
  [BEAM-10961] Strict dependency checking for sdks/java/io/gcp (apache#13791)
  Initial watermark latency benchmark
  Attempting improvements on DirectRunner Python dash
  [BEAM-10961] enable strict dependency checking for sdks/java/extensions/google-cloud-platform-core (apache#14084)
  Merge pull request apache#13802: [BEAM-1474]. Adding MapState and SetState support for the Dataflow runner
  Remove some false positives
  Remove nullness warning suppression
  [BEAM-11861] Add methods to explicitly provide coder for ParquetIO's Parse and ParseFiles (apache#14078)
  [BEAM-11531] Use pandas 1.2 for python>=3.7 (apache#14099)
  [BEAM-10961] add reference to BEAM-11761
  [BEAM-10961] add explicit compile for auto_value_annotations in sdks/extensions/ml/build.gradle
  Attempting improvements on DirectRunner Python dash
  Recognize JOB_STATE_PENDING from Dataflow and map to RUNNING
  never run checkerframework on tests
  Puts more expensive BQ empty table check to the right of the 'and' condition (apache#14094)
  Use the windowing strategy of the input, not output, PCollection of GBK.
  Do not stage dataflow worker jar when use runner_v2.
  [BEAM-11870] Re-raise underlying exception for InvocationTargetException (apache#14098)
  [BEAM-11778] Create a wrapper for ZetaSQL catalog and refactor accordingly. (apache#13934)
  [BEAM-9378] Add ignored tests which fail in various ways when querying nested structures (apache#14077)
  Merge Fn API and runner v2 configurations for DataflowRunner
  Fix up! formatting
  Add validate runner test for testing custom merging windows fn without custom window types
  Revert "Revert "[BEAM-2914] Add portable merging window support to Python. (apache#12995)""
  [BEAM-10961] fix stray reordering of lines
  [BEAM-10961] enable strict dependency checking for sdks/java/extensions/sorter
  [BEAM-10961] enable strict dependency checking for sdks/java/extensions/sketching
  [BEAM-10961] enable strict dependency checking for sdks/java/extensions/schemaio-expansion-service
  [BEAM-10961] enable strict dependency checking for sdks/java/extensions/protobuf
  [BEAM-10961] enable strict dependency checking for sdks/java/extensions/ml
  [BEAM-10961] enable strict dependency checking for sdks/java/extensions/kyro
  [BEAM-10961] enable strict dependency checking for sdks/java/extensions/join-library
  [BEAM-10961] undo line moves (originally intended for alphabeticization)
  [BEAM-10961] enable strict dependency checking for sdks/java/extensions/jackson
  [BEAM-10961] Enable strict dependency checking on sdks/java/extensions/sql (apache#13830)
  [BEAM-10961] enable strict dependency checking for sdks/java/extensions/euphoria
  [BEAM-10961] enable strict dependency checking for sdks/java/io/parquet (apache#14062)
  [BEAM-10961] enable strict dependency checking for sdks/java/io/thrift (apache#14066)
  Refactor ZetaSqlDialectSpecTest and add some passing tests. (apache#14080)
  [BEAM-11864] Use objects.equals instead of raw comparison
  [BEAM-11707] Change WindmillStateCache cache invalidation to be based upon reference invalidation instead of expensive set management. Reduce operations of shared cache by caching per-key object sets locally and flushing as groups to shared cache. Remove byte tracking which could be racy based upon background evictions in favor of just iterating for rendering the status page. This also lets us capture more stats.
  [BEAM-11730] Reduce context switching overhead for appliance reads by issuing reads directly from calling threads in the case that there is no reads being queued.
  Fix preview
  Show string from Dataflow service when job terminates in unrecognized state
  Log a warning when Dataflow returns an unrecognized state
  Merge pull request apache#14033 from [BEAM-11408] Integrate Python BigQuery sink with GroupIntoBatches
  Remove SYNCHRONIZED_PROCESSING_TIME from model proto
  Remove use of model SYNCHRONIZED_PROCESSING_TIME
  Merge redundant model feature columns in capability matrix
  Remove MapReduce runner from capability matrix, because it is on a branch and unreleased
  Remove JStorm runner from capability matrix, because it is on a branch and unreleased
  Remove retractions from capability matrix, because they do not exist yet
  Remove metadata-driven triggers from capability matrix, because they do not exist
  [BEAM-10937] Add Tour of Beam page (apache#13747)
  [BEAM-11344] Apply "Become a Committer" changes from Website Revamp (apache#14036)
  Merge pull request apache#14046 from [BEAM-11791] Adding a microbenchmark for TestStream
  Returning successful writes in FhirIO.Write.Result (apache#14034)
  Fixup
  [BEAM-10961] enable strict dependency checking for sdks/java/io/file-based-io-tests (apache#14052)
  [BEAM-10961] enable strict dependency checking for sdks/java/io/contextualtextio (apache#14049)
  [BEAM-10961] enable strict dependency checking for sdks/java/io/kinesis (apache#14058)
  [BEAM-10961] enable strict dependency checking for sdks/java/io/bigquery-io-perf-tests (apache#14048)
  [BEAM-10961] enable strict dependency checking for sdks/java/io/elasticsearch (apache#14050)
  [BEAM-10961] enable strict dependency checking for sdks/java/io/expansion-service (apache#14051)
  [BEAM-10961] enable strict dependency checking for sdks/java/io/jdbc (apache#14055)
  [BEAM-10961] enable strict dependency checking for sdks/java/io/jms (apache#14056)
  [BEAM-10961] enable strict dependency checking for sdks/java/io/kafka (apache#14057)
  [BEAM-10961] enable strict dependency checking for sdks/java/io/hcatalog (apache#14053)
  [BEAM-11859] Fixed bug in python S3 IO
  [BEAM-10114] Fix PerSubscriptionPartitionSdf to not rely on the presence of BundleFinalizer
  [BEAM-10114] Fix PerSubscriptionPartitionSdf to not rely on the presence of BundleFinalizer
  [BEAM-10961] fix spacing
  [BEAM-10961] enable strict dependency checking for sdks/java/io/xml
  [BEAM-10961] enable strict dependency checking for sdks/java/io/tika
  ...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants