diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py index 608301eedf8f..ee0cbd714a2d 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py @@ -31,6 +31,7 @@ from __future__ import absolute_import import hashlib +import io import logging import random import time @@ -50,6 +51,13 @@ from apache_beam.transforms.util import GroupIntoBatches from apache_beam.transforms.window import GlobalWindows +# Protect against environments where bigquery library is not available. +# pylint: disable=wrong-import-order, wrong-import-position +try: + from apitools.base.py.exceptions import HttpError +except ImportError: + pass + _LOGGER = logging.getLogger(__name__) ONE_TERABYTE = (1 << 40) @@ -316,6 +324,119 @@ def process(self, element, file_prefix, *schema_side_inputs): yield (destination, (file_path, file_size)) +class UpdateDestinationSchema(beam.DoFn): + """Update destination schema based on data that is about to be copied into it. + + Unlike load and query jobs, BigQuery copy jobs do not support schema field + addition or relaxation on the destination table. This DoFn fills that gap by + updating the destination table schemas to be compatible with the data coming + from the source table so that schema field modification options are respected + regardless of whether data is loaded directly to the destination table or + loaded into temporary tables before being copied into the destination. + + This tranform takes as input a (destination, job_reference) pair where the + job_reference refers to a completed load job into a temporary table. + + This transform emits (destination, job_reference) pairs where the + job_reference refers to a submitted load job for performing the schema + modification. Note that the input and output job references are not the same. + + Experimental; no backwards compatibility guarantees. + """ + def __init__( + self, + write_disposition=None, + test_client=None, + additional_bq_parameters=None, + step_name=None): + self._test_client = test_client + self._write_disposition = write_disposition + self._additional_bq_parameters = additional_bq_parameters or {} + self._step_name = step_name + + def setup(self): + self._bq_wrapper = bigquery_tools.BigQueryWrapper(client=self._test_client) + self._bq_io_metadata = create_bigquery_io_metadata(self._step_name) + + def process(self, element, schema_mod_job_name_prefix): + destination = element[0] + temp_table_load_job_reference = element[1] + + if callable(self._additional_bq_parameters): + additional_parameters = self._additional_bq_parameters(destination) + elif isinstance(self._additional_bq_parameters, vp.ValueProvider): + additional_parameters = self._additional_bq_parameters.get() + else: + additional_parameters = self._additional_bq_parameters + + # When writing to normal tables WRITE_TRUNCATE will overwrite the schema but + # when writing to a partition, care needs to be taken to update the schema + # even on WRITE_TRUNCATE. + if (self._write_disposition not in ('WRITE_TRUNCATE', 'WRITE_APPEND') or + not additional_parameters or + not additional_parameters.get("schemaUpdateOptions")): + # No need to modify schema of destination table + return + + table_reference = bigquery_tools.parse_table_reference(destination) + if table_reference.projectId is None: + table_reference.projectId = vp.RuntimeValueProvider.get_value( + 'project', str, '') + + try: + # Check if destination table exists + destination_table = self._bq_wrapper.get_table( + project_id=table_reference.projectId, + dataset_id=table_reference.datasetId, + table_id=table_reference.tableId) + except HttpError as exn: + if exn.status_code == 404: + # Destination table does not exist, so no need to modify its schema + # ahead of the copy jobs. + return + else: + raise + + temp_table_load_job = self._bq_wrapper.get_job( + project=temp_table_load_job_reference.projectId, + job_id=temp_table_load_job_reference.jobId, + location=temp_table_load_job_reference.location) + temp_table_schema = temp_table_load_job.configuration.load.schema + + # FIXME: This short-circuit lacks specificity. Schemas differing only in + # the order of fields are not equivalent according to == but do not + # need a schema modification job to precede the copy job. + if temp_table_schema == destination_table.schema: + # Destination table schema is already the same as the temp table schema, + # so no need to run a job to update the destination table schema. + return + + destination_hash = _bq_uuid( + '%s:%s.%s' % ( + table_reference.projectId, + table_reference.datasetId, + table_reference.tableId)) + uid = _bq_uuid() + job_name = '%s_%s_%s' % (schema_mod_job_name_prefix, destination_hash, uid) + + _LOGGER.debug( + 'Triggering schema modification job %s on %s', + job_name, + table_reference) + # Trigger potential schema modification by loading zero rows into the + # destination table with the temporary table schema. + schema_update_job_reference = self._bq_wrapper.perform_load_job( + destination=table_reference, + source_stream=io.BytesIO(), # file with zero rows + job_id=job_name, + schema=temp_table_schema, + write_disposition='WRITE_APPEND', + create_disposition='CREATE_NEVER', + additional_load_parameters=additional_parameters, + job_labels=self._bq_io_metadata.add_additional_bq_job_labels()) + yield (destination, schema_update_job_reference) + + class TriggerCopyJobs(beam.DoFn): """Launches jobs to copy from temporary tables into the main target table. @@ -355,7 +476,7 @@ def start_bundle(self): if not self.bq_io_metadata: self.bq_io_metadata = create_bigquery_io_metadata(self._step_name) - def process(self, element, job_name_prefix=None): + def process(self, element, job_name_prefix=None, unused_schema_mod_jobs=None): destination = element[0] job_reference = element[1] @@ -526,9 +647,9 @@ def process(self, element, load_job_name_prefix, *schema_side_inputs): if not self.bq_io_metadata: self.bq_io_metadata = create_bigquery_io_metadata(self._step_name) job_reference = self.bq_wrapper.perform_load_job( - table_reference, - files, - job_name, + destination=table_reference, + source_uris=files, + job_id=job_name, schema=schema, write_disposition=self.write_disposition, create_disposition=create_disposition, @@ -845,6 +966,7 @@ def _load_data( partitions_using_temp_tables, partitions_direct_to_destination, load_job_name_pcv, + schema_mod_job_name_pcv, copy_job_name_pcv, p, step_name): @@ -883,32 +1005,56 @@ def _load_data( temp_tables_load_job_ids_pc = trigger_loads_outputs['main'] temp_tables_pc = trigger_loads_outputs[TriggerLoadJobs.TEMP_TABLES] - destination_copy_job_ids_pc = ( + finished_temp_tables_load_jobs_pc = ( p | "ImpulseMonitorLoadJobs" >> beam.Create([None]) | "WaitForTempTableLoadJobs" >> beam.ParDo( WaitForBQJobs(self.test_client), - beam.pvalue.AsList(temp_tables_load_job_ids_pc)) + pvalue.AsList(temp_tables_load_job_ids_pc))) + + schema_mod_job_ids_pc = ( + finished_temp_tables_load_jobs_pc + | beam.ParDo( + UpdateDestinationSchema( + write_disposition=self.write_disposition, + test_client=self.test_client, + additional_bq_parameters=self.additional_bq_parameters, + step_name=step_name), + schema_mod_job_name_pcv)) + + finished_schema_mod_jobs_pc = ( + p + | "ImpulseMonitorSchemaModJobs" >> beam.Create([None]) + | "WaitForSchemaModJobs" >> beam.ParDo( + WaitForBQJobs(self.test_client), + pvalue.AsList(schema_mod_job_ids_pc))) + + destination_copy_job_ids_pc = ( + finished_temp_tables_load_jobs_pc | beam.ParDo( TriggerCopyJobs( create_disposition=self.create_disposition, write_disposition=self.write_disposition, test_client=self.test_client, step_name=step_name), - copy_job_name_pcv)) + copy_job_name_pcv, + pvalue.AsIter(finished_schema_mod_jobs_pc))) finished_copy_jobs_pc = ( p | "ImpulseMonitorCopyJobs" >> beam.Create([None]) | "WaitForCopyJobs" >> beam.ParDo( WaitForBQJobs(self.test_client), - beam.pvalue.AsList(destination_copy_job_ids_pc))) + pvalue.AsList(destination_copy_job_ids_pc))) _ = ( - finished_copy_jobs_pc + p + | "RemoveTempTables/Impulse" >> beam.Create([None]) | "RemoveTempTables/PassTables" >> beam.FlatMap( - lambda x, + lambda _, + unused_copy_jobs, deleting_tables: deleting_tables, + pvalue.AsIter(finished_copy_jobs_pc), pvalue.AsIter(temp_tables_pc)) | "RemoveTempTables/AddUselessValue" >> beam.Map(lambda x: (x, None)) | "RemoveTempTables/DeduplicateTables" >> beam.GroupByKey() @@ -934,10 +1080,10 @@ def _load_data( _ = ( p - | "ImpulseMonitorDestLoadJobs" >> beam.Create([None]) + | "ImpulseMonitorDestinationLoadJobs" >> beam.Create([None]) | "WaitForDestinationLoadJobs" >> beam.ParDo( WaitForBQJobs(self.test_client), - beam.pvalue.AsList(destination_load_job_ids_pc))) + pvalue.AsList(destination_load_job_ids_pc))) destination_load_job_ids_pc = ( (temp_tables_load_job_ids_pc, destination_load_job_ids_pc) @@ -966,6 +1112,14 @@ def expand(self, pcoll): lambda _: _generate_job_name( job_name, bigquery_tools.BigQueryJobTypes.LOAD, 'LOAD_STEP'))) + schema_mod_job_name_pcv = pvalue.AsSingleton( + singleton_pc + | "SchemaModJobNamePrefix" >> beam.Map( + lambda _: _generate_job_name( + job_name, + bigquery_tools.BigQueryJobTypes.LOAD, + 'SCHEMA_MOD_STEP'))) + copy_job_name_pcv = pvalue.AsSingleton( singleton_pc | "CopyJobNamePrefix" >> beam.Map( @@ -1022,6 +1176,7 @@ def expand(self, pcoll): self._load_data(all_partitions, empty_pc, load_job_name_pcv, + schema_mod_job_name_pcv, copy_job_name_pcv, p, step_name)) @@ -1030,6 +1185,7 @@ def expand(self, pcoll): self._load_data(multiple_partitions_per_destination_pc, single_partition_per_destination_pc, load_job_name_pcv, + schema_mod_job_name_pcv, copy_job_name_pcv, p, step_name)) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py index d61e01ab0a69..82ed8679ef90 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py @@ -69,6 +69,7 @@ # Protect against environments where bigquery library is not available. # pylint: disable=wrong-import-order, wrong-import-position try: + from apitools.base.py.transfer import Upload from apitools.base.py.exceptions import HttpError, HttpForbiddenError except ImportError: pass @@ -83,9 +84,9 @@ _LOGGER = logging.getLogger(__name__) -MAX_RETRIES = 3 - JSON_COMPLIANCE_ERROR = 'NAN, INF and -INF values are not JSON compliant.' +MAX_RETRIES = 3 +UNKNOWN_MIME_TYPE = 'application/octet-stream' class FileFormat(object): @@ -407,13 +408,29 @@ def _insert_load_job( project_id, job_id, table_reference, - source_uris, + source_uris=None, + source_stream=None, schema=None, write_disposition=None, create_disposition=None, additional_load_parameters=None, source_format=None, job_labels=None): + + if not source_uris and not source_stream: + raise ValueError( + 'Either a non-empty list of fully-qualified source URIs must be ' + 'provided via the source_uris parameter or an open file object must ' + 'be provided via the source_stream parameter. Got neither.') + + if source_uris and source_stream: + raise ValueError( + 'Only one of source_uris and source_stream may be specified. ' + 'Got both.') + + if source_uris is None: + source_uris = [] + additional_load_parameters = additional_load_parameters or {} job_schema = None if schema == 'SCHEMA_AUTODETECT' else schema reference = bigquery.JobReference(jobId=job_id, projectId=project_id) @@ -435,18 +452,26 @@ def _insert_load_job( ), jobReference=reference, )) - return self._start_job(request).jobReference + return self._start_job(request, stream=source_stream).jobReference def _start_job( self, - request # type: bigquery.BigqueryJobsInsertRequest + request, # type: bigquery.BigqueryJobsInsertRequest + stream=None, ): """Inserts a BigQuery job. If the job exists already, it returns it. + + Args: + request (bigquery.BigqueryJobsInsertRequest): An insert job request. + stream (IO[bytes]): A bytes IO object open for reading. """ try: - response = self.client.jobs.Insert(request) + upload = None + if stream: + upload = Upload.FromStream(stream, mime_type=UNKNOWN_MIME_TYPE) + response = self.client.jobs.Insert(request, upload=upload) _LOGGER.info( "Stated BigQuery job: %s\n " "bq show -j --format=prettyjson --project_id=%s %s", @@ -809,8 +834,9 @@ def get_job(self, project, job_id, location=None): def perform_load_job( self, destination, - files, job_id, + source_uris=None, + source_stream=None, schema=None, write_disposition=None, create_disposition=None, @@ -822,11 +848,23 @@ def perform_load_job( Returns: bigquery.JobReference with the information about the job that was started. """ + if not source_uris and not source_stream: + raise ValueError( + 'Either a non-empty list of fully-qualified source URIs must be ' + 'provided via the source_uris parameter or an open file object must ' + 'be provided via the source_stream parameter. Got neither.') + + if source_uris and source_stream: + raise ValueError( + 'Only one of source_uris and source_stream may be specified. ' + 'Got both.') + return self._insert_load_job( destination.projectId, job_id, destination, - files, + source_uris=source_uris, + source_stream=source_stream, schema=schema, create_disposition=create_disposition, write_disposition=write_disposition, diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py index 777b5b2620b9..558c8738dd44 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py @@ -386,6 +386,35 @@ def test_get_query_location(self): project_id="second_project_id", query=query, use_legacy_sql=False) self.assertEqual("US", location) + def test_perform_load_job_source_mutual_exclusivity(self): + client = mock.Mock() + wrapper = beam.io.gcp.bigquery_tools.BigQueryWrapper(client) + + # Both source_uri and source_stream specified. + with self.assertRaises(ValueError): + wrapper.perform_load_job( + destination=parse_table_reference('project:dataset.table'), + job_id='job_id', + source_uris=['gs://example.com/*'], + source_stream=io.BytesIO()) + + # Neither source_uri nor source_stream specified. + with self.assertRaises(ValueError): + wrapper.perform_load_job(destination='P:D.T', job_id='J') + + def test_perform_load_job_with_source_stream(self): + client = mock.Mock() + wrapper = beam.io.gcp.bigquery_tools.BigQueryWrapper(client) + + wrapper.perform_load_job( + destination=parse_table_reference('project:dataset.table'), + job_id='job_id', + source_stream=io.BytesIO(b'some,data')) + + client.jobs.Insert.assert_called_once() + upload = client.jobs.Insert.call_args[1]["upload"] + self.assertEqual(b'some,data', upload.stream.read()) + @unittest.skipIf(HttpError is None, 'GCP dependencies are not installed') class TestBigQueryReader(unittest.TestCase): diff --git a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py index a5c1ce71e2a3..6a7c91bc8124 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py @@ -31,6 +31,7 @@ from decimal import Decimal import hamcrest as hc +import mock import pytz from future.utils import iteritems from nose.plugins.attrib import attr @@ -354,6 +355,50 @@ def test_big_query_write_without_schema(self): write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, temp_file_format=FileFormat.JSON)) + @attr('IT') + @mock.patch( + "apache_beam.io.gcp.bigquery_file_loads._MAXIMUM_SOURCE_URIS", new=1) + def test_big_query_write_temp_table_append_schema_update(self): + """ + Test that schema update options are respected when appending to an existing + table via temporary tables. + + _MAXIMUM_SOURCE_URIS and max_file_size are both set to 1 to force multiple + load jobs and usage of temporary tables. + """ + table_name = 'python_append_schema_update' + self.create_table(table_name) + table_id = '{}.{}'.format(self.dataset_id, table_name) + + input_data = [{"int64": 1, "bool": True}, {"int64": 2, "bool": False}] + + table_schema = { + "fields": [{ + "name": "int64", "type": "INT64" + }, { + "name": "bool", "type": "BOOL" + }] + } + + args = self.test_pipeline.get_full_options_as_args( + on_success_matcher=BigqueryFullResultMatcher( + project=self.project, + query="SELECT bytes, date, time, int64, bool FROM %s" % table_id, + data=[(None, None, None, 1, True), (None, None, None, 2, False)])) + + with beam.Pipeline(argv=args) as p: + # pylint: disable=expression-not-assigned + ( + p | 'create' >> beam.Create(input_data) + | 'write' >> beam.io.WriteToBigQuery( + table_id, + schema=table_schema, + write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, + max_file_size=1, # bytes + method=beam.io.WriteToBigQuery.Method.FILE_LOADS, + additional_bq_parameters={ + 'schemaUpdateOptions': ['ALLOW_FIELD_ADDITION']})) + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO)