From d98f1d157ea51593bd1b44e71aa646713572bc19 Mon Sep 17 00:00:00 2001 From: Chuck Yang Date: Sat, 27 Feb 2021 01:13:39 -0800 Subject: [PATCH 1/9] Iterate over temp_tables_pc only once --- sdks/python/apache_beam/io/gcp/bigquery_file_loads.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py index 298ead6b107e..7b6981f8587e 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py @@ -900,10 +900,13 @@ def _load_data( beam.pvalue.AsList(destination_copy_job_ids_pc))) _ = ( - finished_copy_jobs_pc + p + | "RemoveTempTables/Impulse" >> beam.Create([None]) | "RemoveTempTables/PassTables" >> beam.FlatMap( - lambda x, + lambda _, + unused_copy_jobs, deleting_tables: deleting_tables, + pvalue.AsIter(finished_copy_jobs_pc), pvalue.AsIter(temp_tables_pc)) | "RemoveTempTables/AddUselessValue" >> beam.Map(lambda x: (x, None)) | "RemoveTempTables/DeduplicateTables" >> beam.GroupByKey() From 37adbfdd8dbd655b919b3031a2d08273759bbd82 Mon Sep 17 00:00:00 2001 From: Chuck Yang Date: Sat, 27 Feb 2021 01:19:12 -0800 Subject: [PATCH 2/9] WIP: Update destination table schemas When using temporary tables to append data to an existing table, first update the schema of the destination table if schema field addition or relaxation are allowed in schemaUpdateOptions. This needs to be done as a separate step because BQ copy jobs do not support schema update when appending to an existing table. WIP because empty files list does not work when submitting load jobs. Pointing to an empty file in GCS does work but that means this empty file needs to be created. --- .../apache_beam/io/gcp/bigquery_file_loads.py | 161 ++++++++++++++++-- 1 file changed, 150 insertions(+), 11 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py index 7b6981f8587e..d9a28cb78472 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py @@ -50,6 +50,13 @@ from apache_beam.transforms.util import GroupIntoBatches from apache_beam.transforms.window import GlobalWindows +# Protect against environments where bigquery library is not available. +# pylint: disable=wrong-import-order, wrong-import-position +try: + from apitools.base.py.exceptions import HttpError +except ImportError: + pass + _LOGGER = logging.getLogger(__name__) ONE_TERABYTE = (1 << 40) @@ -312,6 +319,109 @@ def process(self, element, file_prefix, *schema_side_inputs): yield (destination, (file_path, file_size)) +class UpdateDestinationSchema(beam.DoFn): + """Update destination schema based on data that is about to be copied into it. + + Unlike load and query jobs, BigQuery copy jobs do not support schema field + addition or relaxation on the destination table. This DoFn fills that gap by + updating the destination table schemas to be compatible with the data coming + from the source table so that schema field modification options are respected + regardless of whether data is loaded directly to the destination table or + loaded into temporary tables before being copied into the destination. + + This tranform takes as input a (destination, job_reference) pair where the + job_reference refers to a completed load job into a temporary table. + + This transform emits (destination, job_reference) pairs where the + job_reference refers to a submitted load job for performing the schema + modification. Note that the input and output job references are not the same. + + Experimental; no backwards compatibility guarantees. + """ + def __init__( + self, + write_disposition=None, + test_client=None, + additional_bq_parameters=None, + step_name=None): + self._test_client = test_client + self._write_disposition = write_disposition + self._additional_bq_parameters = additional_bq_parameters or {} + self._step_name = step_name + + def setup(self): + self._bq_wrapper = bigquery_tools.BigQueryWrapper(client=self._test_client) + self._bq_io_metadata = create_bigquery_io_metadata(self._step_name) + + def process(self, element, schema_mod_job_name_prefix): + destination = element[0] + temp_table_load_job_reference = element[1] + + if callable(self._additional_bq_parameters): + additional_parameters = self._additional_bq_parameters(destination) + elif isinstance(self._additional_bq_parameters, vp.ValueProvider): + additional_parameters = self._additional_bq_parameters.get() + else: + additional_parameters = self._additional_bq_parameters + + # When writing to normal tables WRITE_TRUNCATE will overwrite the schema but + # when writing to a partition, care needs to be taken to update the schema + # even on WRITE_TRUNCATE. + if (self._write_disposition not in ('WRITE_TRUNCATE', 'WRITE_APPEND') or + not additional_parameters or + not additional_parameters.get("schemaUpdateOptions")): + # No need to modify schema of destination table + return + + table_reference = bigquery_tools.parse_table_reference(destination) + if table_reference.projectId is None: + table_reference.projectId = vp.RuntimeValueProvider.get_value( + 'project', str, '') + + try: + # Check if destination table exists + _ = self._bq_wrapper.get_table( + project_id=table_reference.projectId, + dataset_id=table_reference.datasetId, + table_id=table_reference.tableId) + except HttpError as exn: + if exn.status_code == 404: + # Destination table does not exist, so no need to modify its schema + # ahead of the copy jobs. + return + else: + raise + + temp_table_load_job = self._bq_wrapper.get_job( + project=temp_table_load_job_reference.projectId, + job_id=temp_table_load_job_reference.jobId, + location=temp_table_load_job_reference.location) + temp_table_schema = temp_table_load_job.configuration.load.schema + + destination_hash = _bq_uuid( + '%s:%s.%s' % ( + table_reference.projectId, + table_reference.datasetId, + table_reference.tableId)) + uid = _bq_uuid() + job_name = '%s_%s_%s' % (schema_mod_job_name_prefix, destination_hash, uid) + + _LOGGER.debug( + 'Triggering schema modification job %s on %s', + job_name, + table_reference) + schema_update_job_reference = self._bq_wrapper.perform_load_job( + destination=table_reference, + files=[], # FIXME: Load configuration must specify at least one source URI + job_id=job_name, + schema=temp_table_schema, + write_disposition='WRITE_APPEND', + create_disposition='CREATE_NEVER', + additional_load_parameters=additional_parameters, + job_labels=self._bq_io_metadata.add_additional_bq_job_labels()) + yield (destination, schema_update_job_reference) + + class TriggerCopyJobs(beam.DoFn): """Launches jobs to copy from temporary tables into the main target table. @@ -351,7 +461,7 @@ def start_bundle(self): if not self.bq_io_metadata: self.bq_io_metadata = create_bigquery_io_metadata(self._step_name) - def process(self, element, job_name_prefix=None): + def process(self, element, job_name_prefix=None, unused_schema_mod_jobs=None): destination = element[0] job_reference = element[1] @@ -840,6 +950,7 @@ def _load_data( partitions_using_temp_tables, partitions_direct_to_destination, load_job_name_pcv, + schema_mod_job_name_pcv, copy_job_name_pcv, p, step_name): @@ -858,6 +969,8 @@ def _load_data( of the load jobs would fail but not other. If any of them fails, then copy jobs are not triggered. """ + singleton_pc = p | "ImpulseLoadData" >> beam.Create([None]) + # Load data using temp tables trigger_loads_outputs = ( partitions_using_temp_tables @@ -878,30 +991,47 @@ def _load_data( temp_tables_load_job_ids_pc = trigger_loads_outputs['main'] temp_tables_pc = trigger_loads_outputs[TriggerLoadJobs.TEMP_TABLES] - destination_copy_job_ids_pc = ( - p - | "ImpulseMonitorLoadJobs" >> beam.Create([None]) + finished_temp_tables_load_jobs_pc = ( + singleton_pc | "WaitForTempTableLoadJobs" >> beam.ParDo( WaitForBQJobs(self.test_client), beam.pvalue.AsList(temp_tables_load_job_ids_pc)) + + schema_mod_job_ids_pc = ( + finished_temp_tables_load_jobs_pc + | beam.ParDo( + UpdateDestinationSchema( + write_disposition=self.write_disposition, + test_client=self.test_client, + additional_bq_parameters=self.additional_bq_parameters, + step_name=step_name), + schema_mod_job_name_pcv)) + + finished_schema_mod_jobs_pc = ( + singleton_pc + | "WaitForSchemaModJobs" >> beam.ParDo( + WaitForBQJobs(self.test_client), + beam.pvalue.AsList(schema_mod_job_ids_pc))) + + destination_copy_job_ids_pc = ( + finished_temp_tables_load_jobs_pc | beam.ParDo( TriggerCopyJobs( create_disposition=self.create_disposition, write_disposition=self.write_disposition, test_client=self.test_client, step_name=step_name), - copy_job_name_pcv)) + copy_job_name_pcv, + beam.pvalue.AsIter(finished_schema_mod_jobs_pc))) finished_copy_jobs_pc = ( - p - | "ImpulseMonitorCopyJobs" >> beam.Create([None]) + singleton_pc | "WaitForCopyJobs" >> beam.ParDo( WaitForBQJobs(self.test_client), beam.pvalue.AsList(destination_copy_job_ids_pc))) _ = ( - p - | "RemoveTempTables/Impulse" >> beam.Create([None]) + singleton_pc | "RemoveTempTables/PassTables" >> beam.FlatMap( lambda _, unused_copy_jobs, @@ -931,8 +1061,7 @@ def _load_data( *self.schema_side_inputs)) _ = ( - p - | "ImpulseMonitorDestLoadJobs" >> beam.Create([None]) + singleton_pc | "WaitForDestinationLoadJobs" >> beam.ParDo( WaitForBQJobs(self.test_client), beam.pvalue.AsList(destination_load_job_ids_pc))) @@ -964,6 +1093,14 @@ def expand(self, pcoll): lambda _: _generate_job_name( job_name, bigquery_tools.BigQueryJobTypes.LOAD, 'LOAD_STEP'))) + schema_mod_job_name_pcv = pvalue.AsSingleton( + singleton_pc + | "SchemaModJobNamePrefix" >> beam.Map( + lambda _: _generate_job_name( + job_name, + bigquery_tools.BigQueryJobTypes.LOAD, + 'SCHEMA_MOD_STEP'))) + copy_job_name_pcv = pvalue.AsSingleton( singleton_pc | "CopyJobNamePrefix" >> beam.Map( @@ -1020,6 +1157,7 @@ def expand(self, pcoll): self._load_data(all_partitions, empty_pc, load_job_name_pcv, + schema_mod_job_name_pcv, copy_job_name_pcv, p, step_name)) @@ -1028,6 +1166,7 @@ def expand(self, pcoll): self._load_data(multiple_partitions_per_destination_pc, single_partition_per_destination_pc, load_job_name_pcv, + schema_mod_job_name_pcv, copy_job_name_pcv, p, step_name)) From de0d90c12a0ce626ff0038429f263399622c1a97 Mon Sep 17 00:00:00 2001 From: Chuck Yang Date: Sat, 27 Feb 2021 01:24:50 -0800 Subject: [PATCH 3/9] Consistency: Use pvalue rather than beam.pvalue --- sdks/python/apache_beam/io/gcp/bigquery_file_loads.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py index d9a28cb78472..95d80a1d30f7 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py @@ -995,7 +995,7 @@ def _load_data( singleton_pc | "WaitForTempTableLoadJobs" >> beam.ParDo( WaitForBQJobs(self.test_client), - beam.pvalue.AsList(temp_tables_load_job_ids_pc)) + pvalue.AsList(temp_tables_load_job_ids_pc))) schema_mod_job_ids_pc = ( finished_temp_tables_load_jobs_pc @@ -1011,7 +1011,7 @@ def _load_data( singleton_pc | "WaitForSchemaModJobs" >> beam.ParDo( WaitForBQJobs(self.test_client), - beam.pvalue.AsList(schema_mod_job_ids_pc))) + pvalue.AsList(schema_mod_job_ids_pc))) destination_copy_job_ids_pc = ( finished_temp_tables_load_jobs_pc @@ -1022,13 +1022,13 @@ def _load_data( test_client=self.test_client, step_name=step_name), copy_job_name_pcv, - beam.pvalue.AsIter(finished_schema_mod_jobs_pc))) + pvalue.AsIter(finished_schema_mod_jobs_pc))) finished_copy_jobs_pc = ( singleton_pc | "WaitForCopyJobs" >> beam.ParDo( WaitForBQJobs(self.test_client), - beam.pvalue.AsList(destination_copy_job_ids_pc))) + pvalue.AsList(destination_copy_job_ids_pc))) _ = ( singleton_pc @@ -1064,7 +1064,7 @@ def _load_data( singleton_pc | "WaitForDestinationLoadJobs" >> beam.ParDo( WaitForBQJobs(self.test_client), - beam.pvalue.AsList(destination_load_job_ids_pc))) + pvalue.AsList(destination_load_job_ids_pc))) destination_load_job_ids_pc = ( (temp_tables_load_job_ids_pc, destination_load_job_ids_pc) From 91e9a15ecb97fb398635d15daedc67c2e68b4089 Mon Sep 17 00:00:00 2001 From: Chuck Yang Date: Sun, 28 Feb 2021 19:50:52 -0800 Subject: [PATCH 4/9] Trigger schema update load job using empty file --- .../apache_beam/io/gcp/bigquery_file_loads.py | 11 ++-- .../apache_beam/io/gcp/bigquery_tools.py | 54 ++++++++++++++++--- 2 files changed, 53 insertions(+), 12 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py index 95d80a1d30f7..15b2637b4d57 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py @@ -31,6 +31,7 @@ from __future__ import absolute_import import hashlib +import io import logging import random import time @@ -410,9 +411,11 @@ def process(self, element, schema_mod_job_name_prefix): 'Triggering schema modification job %s on %s', job_name, table_reference) + # Trigger potential schema modification by loading zero rows into the + # destination table with the temporary table schema. schema_update_job_reference = self._bq_wrapper.perform_load_job( destination=table_reference, - files=[], # FIXME: Load configuration must specify at least one source URI + source_stream=io.BytesIO(), # file with zero rows job_id=job_name, schema=temp_table_schema, write_disposition='WRITE_APPEND', @@ -632,9 +635,9 @@ def process(self, element, load_job_name_prefix, *schema_side_inputs): if not self.bq_io_metadata: self.bq_io_metadata = create_bigquery_io_metadata(self._step_name) job_reference = self.bq_wrapper.perform_load_job( - table_reference, - files, - job_name, + destination=table_reference, + source_uris=files, + job_id=job_name, schema=schema, write_disposition=self.write_disposition, create_disposition=create_disposition, diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py index d61e01ab0a69..82ed8679ef90 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py @@ -69,6 +69,7 @@ # Protect against environments where bigquery library is not available. # pylint: disable=wrong-import-order, wrong-import-position try: + from apitools.base.py.transfer import Upload from apitools.base.py.exceptions import HttpError, HttpForbiddenError except ImportError: pass @@ -83,9 +84,9 @@ _LOGGER = logging.getLogger(__name__) -MAX_RETRIES = 3 - JSON_COMPLIANCE_ERROR = 'NAN, INF and -INF values are not JSON compliant.' +MAX_RETRIES = 3 +UNKNOWN_MIME_TYPE = 'application/octet-stream' class FileFormat(object): @@ -407,13 +408,29 @@ def _insert_load_job( project_id, job_id, table_reference, - source_uris, + source_uris=None, + source_stream=None, schema=None, write_disposition=None, create_disposition=None, additional_load_parameters=None, source_format=None, job_labels=None): + + if not source_uris and not source_stream: + raise ValueError( + 'Either a non-empty list of fully-qualified source URIs must be ' + 'provided via the source_uris parameter or an open file object must ' + 'be provided via the source_stream parameter. Got neither.') + + if source_uris and source_stream: + raise ValueError( + 'Only one of source_uris and source_stream may be specified. ' + 'Got both.') + + if source_uris is None: + source_uris = [] + additional_load_parameters = additional_load_parameters or {} job_schema = None if schema == 'SCHEMA_AUTODETECT' else schema reference = bigquery.JobReference(jobId=job_id, projectId=project_id) @@ -435,18 +452,26 @@ def _insert_load_job( ), jobReference=reference, )) - return self._start_job(request).jobReference + return self._start_job(request, stream=source_stream).jobReference def _start_job( self, - request # type: bigquery.BigqueryJobsInsertRequest + request, # type: bigquery.BigqueryJobsInsertRequest + stream=None, ): """Inserts a BigQuery job. If the job exists already, it returns it. + + Args: + request (bigquery.BigqueryJobsInsertRequest): An insert job request. + stream (IO[bytes]): A bytes IO object open for reading. """ try: - response = self.client.jobs.Insert(request) + upload = None + if stream: + upload = Upload.FromStream(stream, mime_type=UNKNOWN_MIME_TYPE) + response = self.client.jobs.Insert(request, upload=upload) _LOGGER.info( "Stated BigQuery job: %s\n " "bq show -j --format=prettyjson --project_id=%s %s", @@ -809,8 +834,9 @@ def get_job(self, project, job_id, location=None): def perform_load_job( self, destination, - files, job_id, + source_uris=None, + source_stream=None, schema=None, write_disposition=None, create_disposition=None, @@ -822,11 +848,23 @@ def perform_load_job( Returns: bigquery.JobReference with the information about the job that was started. """ + if not source_uris and not source_stream: + raise ValueError( + 'Either a non-empty list of fully-qualified source URIs must be ' + 'provided via the source_uris parameter or an open file object must ' + 'be provided via the source_stream parameter. Got neither.') + + if source_uris and source_stream: + raise ValueError( + 'Only one of source_uris and source_stream may be specified. ' + 'Got both.') + return self._insert_load_job( destination.projectId, job_id, destination, - files, + source_uris=source_uris, + source_stream=source_stream, schema=schema, create_disposition=create_disposition, write_disposition=write_disposition, From 1e253802b6ad75b0a46610de64c625178d47a6cd Mon Sep 17 00:00:00 2001 From: Chuck Yang Date: Sun, 28 Feb 2021 21:38:02 -0800 Subject: [PATCH 5/9] Add tests for BigQueryWrapper.perform_load_job --- .../apache_beam/io/gcp/bigquery_tools_test.py | 29 +++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py index 777b5b2620b9..558c8738dd44 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py @@ -386,6 +386,35 @@ def test_get_query_location(self): project_id="second_project_id", query=query, use_legacy_sql=False) self.assertEqual("US", location) + def test_perform_load_job_source_mutual_exclusivity(self): + client = mock.Mock() + wrapper = beam.io.gcp.bigquery_tools.BigQueryWrapper(client) + + # Both source_uri and source_stream specified. + with self.assertRaises(ValueError): + wrapper.perform_load_job( + destination=parse_table_reference('project:dataset.table'), + job_id='job_id', + source_uris=['gs://example.com/*'], + source_stream=io.BytesIO()) + + # Neither source_uri nor source_stream specified. + with self.assertRaises(ValueError): + wrapper.perform_load_job(destination='P:D.T', job_id='J') + + def test_perform_load_job_with_source_stream(self): + client = mock.Mock() + wrapper = beam.io.gcp.bigquery_tools.BigQueryWrapper(client) + + wrapper.perform_load_job( + destination=parse_table_reference('project:dataset.table'), + job_id='job_id', + source_stream=io.BytesIO(b'some,data')) + + client.jobs.Insert.assert_called_once() + upload = client.jobs.Insert.call_args[1]["upload"] + self.assertEqual(b'some,data', upload.stream.read()) + @unittest.skipIf(HttpError is None, 'GCP dependencies are not installed') class TestBigQueryReader(unittest.TestCase): From 911945c7f70bb6387ccd72fb1f4f9684613b5681 Mon Sep 17 00:00:00 2001 From: Chuck Yang Date: Sun, 28 Feb 2021 23:11:16 -0800 Subject: [PATCH 6/9] Add integration test --- .../io/gcp/bigquery_write_it_test.py | 45 +++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py index a5c1ce71e2a3..6a7c91bc8124 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py @@ -31,6 +31,7 @@ from decimal import Decimal import hamcrest as hc +import mock import pytz from future.utils import iteritems from nose.plugins.attrib import attr @@ -354,6 +355,50 @@ def test_big_query_write_without_schema(self): write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, temp_file_format=FileFormat.JSON)) + @attr('IT') + @mock.patch( + "apache_beam.io.gcp.bigquery_file_loads._MAXIMUM_SOURCE_URIS", new=1) + def test_big_query_write_temp_table_append_schema_update(self): + """ + Test that schema update options are respected when appending to an existing + table via temporary tables. + + _MAXIMUM_SOURCE_URIS and max_file_size are both set to 1 to force multiple + load jobs and usage of temporary tables. + """ + table_name = 'python_append_schema_update' + self.create_table(table_name) + table_id = '{}.{}'.format(self.dataset_id, table_name) + + input_data = [{"int64": 1, "bool": True}, {"int64": 2, "bool": False}] + + table_schema = { + "fields": [{ + "name": "int64", "type": "INT64" + }, { + "name": "bool", "type": "BOOL" + }] + } + + args = self.test_pipeline.get_full_options_as_args( + on_success_matcher=BigqueryFullResultMatcher( + project=self.project, + query="SELECT bytes, date, time, int64, bool FROM %s" % table_id, + data=[(None, None, None, 1, True), (None, None, None, 2, False)])) + + with beam.Pipeline(argv=args) as p: + # pylint: disable=expression-not-assigned + ( + p | 'create' >> beam.Create(input_data) + | 'write' >> beam.io.WriteToBigQuery( + table_id, + schema=table_schema, + write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, + max_file_size=1, # bytes + method=beam.io.WriteToBigQuery.Method.FILE_LOADS, + additional_bq_parameters={ + 'schemaUpdateOptions': ['ALLOW_FIELD_ADDITION']})) + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) From 0da053226e8d2eb1b5b9616429843d916e0db29f Mon Sep 17 00:00:00 2001 From: Chuck Yang Date: Fri, 19 Mar 2021 11:54:48 -0700 Subject: [PATCH 7/9] Use separate singletons in _load_data https://github.com/apache/beam/pull/14113#discussion_r592802990 Reusing one single PCollection concentrates the different paths into a single stage which complicates firing of triggers for the stage. --- .../apache_beam/io/gcp/bigquery_file_loads.py | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py index 1f4ed686ce17..90fd61585bd2 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py @@ -977,8 +977,6 @@ def _load_data( of the load jobs would fail but not other. If any of them fails, then copy jobs are not triggered. """ - singleton_pc = p | "ImpulseLoadData" >> beam.Create([None]) - # Load data using temp tables trigger_loads_outputs = ( partitions_using_temp_tables @@ -1000,7 +998,8 @@ def _load_data( temp_tables_pc = trigger_loads_outputs[TriggerLoadJobs.TEMP_TABLES] finished_temp_tables_load_jobs_pc = ( - singleton_pc + p + | "ImpulseMonitorLoadJobs" >> beam.Create([None]) | "WaitForTempTableLoadJobs" >> beam.ParDo( WaitForBQJobs(self.test_client), pvalue.AsList(temp_tables_load_job_ids_pc))) @@ -1016,7 +1015,8 @@ def _load_data( schema_mod_job_name_pcv)) finished_schema_mod_jobs_pc = ( - singleton_pc + p + | "ImpulseMonitorSchemaModJobs" >> beam.Create([None]) | "WaitForSchemaModJobs" >> beam.ParDo( WaitForBQJobs(self.test_client), pvalue.AsList(schema_mod_job_ids_pc))) @@ -1033,13 +1033,15 @@ def _load_data( pvalue.AsIter(finished_schema_mod_jobs_pc))) finished_copy_jobs_pc = ( - singleton_pc + p + | "ImpulseMonitorCopyJobs" >> beam.Create([None]) | "WaitForCopyJobs" >> beam.ParDo( WaitForBQJobs(self.test_client), pvalue.AsList(destination_copy_job_ids_pc))) _ = ( - singleton_pc + p + | "RemoveTempTables/Impulse" >> beam.Create([None]) | "RemoveTempTables/PassTables" >> beam.FlatMap( lambda _, unused_copy_jobs, @@ -1069,7 +1071,8 @@ def _load_data( *self.schema_side_inputs)) _ = ( - singleton_pc + p + | "ImpulseMonitorDestinationLoadJobs" >> beam.Create([None]) | "WaitForDestinationLoadJobs" >> beam.ParDo( WaitForBQJobs(self.test_client), pvalue.AsList(destination_load_job_ids_pc))) From 6acd291c007d8d884ce00e1909292d9c9adaefc4 Mon Sep 17 00:00:00 2001 From: Chuck Yang Date: Fri, 19 Mar 2021 14:24:14 -0700 Subject: [PATCH 8/9] Avoid running a schema modification load job... ..when destination table schema matches temp table schema. --- sdks/python/apache_beam/io/gcp/bigquery_file_loads.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py index 90fd61585bd2..008a9b7dd3e1 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py @@ -385,7 +385,7 @@ def process(self, element, schema_mod_job_name_prefix): try: # Check if destination table exists - _ = self._bq_wrapper.get_table( + destination_table = self._bq_wrapper.get_table( project_id=table_reference.projectId, dataset_id=table_reference.datasetId, table_id=table_reference.tableId) @@ -403,6 +403,14 @@ def process(self, element, schema_mod_job_name_prefix): location=temp_table_load_job_reference.location) temp_table_schema = temp_table_load_job.configuration.load.schema + # FIXME: This short-circuit lacks specificity. Schemas differing only in + # the order of fields are not equivalent according to == but do not + # need a schema modification job to precede the copy job. + if temp_table_schema == destination_table.schema: + # Destination table schema is already the same as the temp table schema, + # so no need to run a job to update the destination table schema. + return + destination_hash = _bq_uuid( '%s:%s.%s' % ( table_reference.projectId, From b9ff38ecc21a73a02c143888dddc734188e5301c Mon Sep 17 00:00:00 2001 From: Chuck Yang Date: Fri, 19 Mar 2021 14:32:40 -0700 Subject: [PATCH 9/9] Fix spacing --- sdks/python/apache_beam/io/gcp/bigquery_file_loads.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py index 008a9b7dd3e1..ee0cbd714a2d 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py @@ -407,9 +407,9 @@ def process(self, element, schema_mod_job_name_prefix): # the order of fields are not equivalent according to == but do not # need a schema modification job to precede the copy job. if temp_table_schema == destination_table.schema: - # Destination table schema is already the same as the temp table schema, - # so no need to run a job to update the destination table schema. - return + # Destination table schema is already the same as the temp table schema, + # so no need to run a job to update the destination table schema. + return destination_hash = _bq_uuid( '%s:%s.%s' % (