From f5cee03fcd713d04fd7124532db170cb89421896 Mon Sep 17 00:00:00 2001 From: Sayat Satybaldiyev Date: Tue, 7 Dec 2021 10:57:21 -0800 Subject: [PATCH 1/6] Fix temporary file format in WriteToBigQuery --- .../apache_beam/io/gcp/bigquery_file_loads.py | 13 +-- .../io/gcp/bigquery_write_it_test.py | 110 +++++++++++++++++- 2 files changed, 111 insertions(+), 12 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py index e211ab4ebf8d..bf8565f46fbd 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py @@ -335,7 +335,8 @@ class UpdateDestinationSchema(beam.DoFn): This transform emits (destination, job_reference) pairs where the job_reference refers to a submitted load job for performing the schema - modification. Note that the input and output job references are not the same. + modification in JSON format. Note that the input and output job references + are not the same. Experimental; no backwards compatibility guarantees. """ @@ -344,13 +345,11 @@ def __init__( write_disposition=None, test_client=None, additional_bq_parameters=None, - step_name=None, - source_format=None): + step_name=None): self._test_client = test_client self._write_disposition = write_disposition self._additional_bq_parameters = additional_bq_parameters or {} self._step_name = step_name - self._source_format = source_format def setup(self): self._bq_wrapper = bigquery_tools.BigQueryWrapper(client=self._test_client) @@ -360,7 +359,6 @@ def display_data(self): return { 'write_disposition': str(self._write_disposition), 'additional_bq_params': str(self._additional_bq_parameters), - 'source_format': str(self._source_format), } def process(self, element, schema_mod_job_name_prefix): @@ -439,7 +437,9 @@ def process(self, element, schema_mod_job_name_prefix): create_disposition='CREATE_NEVER', additional_load_parameters=additional_parameters, job_labels=self._bq_io_metadata.add_additional_bq_job_labels(), - source_format=self._source_format) + # JSON format is hardcoded because the zero rows load with a nested schema + # are permitted which is in contrast with the AVRO format. + source_format="NEWLINE_DELIMITED_JSON") yield (destination, schema_update_job_reference) @@ -1028,7 +1028,6 @@ def _load_data( test_client=self.test_client, additional_bq_parameters=self.additional_bq_parameters, step_name=step_name, - source_format=self._temp_file_format, ), schema_mod_job_name_pcv)) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py index 43bc7c9fa1ed..7f263e6ec66f 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py @@ -32,6 +32,8 @@ import mock import pytest import pytz +from parameterized import param +from parameterized import parameterized import apache_beam as beam from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper @@ -39,6 +41,7 @@ from apache_beam.io.gcp.internal.clients import bigquery from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryFullResultMatcher from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.io.gcp.bigquery import BigQueryDisposition # Protect against environments where bigquery library is not available. # pylint: disable=wrong-import-order, wrong-import-position @@ -353,9 +356,14 @@ def test_big_query_write_without_schema(self): temp_file_format=FileFormat.JSON)) @pytest.mark.it_postcommit + @parameterized.expand([ + param(file_format=FileFormat.AVRO), + param(file_format=FileFormat.JSON), + param(file_format=None), + ]) @mock.patch( "apache_beam.io.gcp.bigquery_file_loads._MAXIMUM_SOURCE_URIS", new=1) - def test_big_query_write_temp_table_append_schema_update(self): + def test_big_query_write_temp_table_append_schema_update(self, file_format): """ Test that schema update options are respected when appending to an existing table via temporary tables. @@ -368,9 +376,9 @@ def test_big_query_write_temp_table_append_schema_update(self): table_id = '{}.{}'.format(self.dataset_id, table_name) input_data = [{ - "int64": num, "bool": True, "nested_field": { + "int64": num, "bool": True, "nested_field": [{ "fruit": "Apple" - } + }] } for num in range(1, 3)] table_schema = { @@ -412,11 +420,103 @@ def test_big_query_write_temp_table_append_schema_update(self): | 'write' >> beam.io.WriteToBigQuery( table_id, schema=table_schema, - write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, + write_disposition=BigQueryDisposition.WRITE_APPEND, max_file_size=1, # bytes method=beam.io.WriteToBigQuery.Method.FILE_LOADS, additional_bq_parameters={ - 'schemaUpdateOptions': ['ALLOW_FIELD_ADDITION']})) + 'schemaUpdateOptions': ['ALLOW_FIELD_ADDITION']}, + temp_file_format=file_format)) + + @pytest.mark.it_postcommit + @parameterized.expand([ + param(file_format=FileFormat.AVRO), + param(file_format=FileFormat.JSON), + param(file_format=None), + ]) + @mock.patch( + "apache_beam.io.gcp.bigquery_file_loads._DEFAULT_MAX_FILE_SIZE", new=1) + @mock.patch( + "apache_beam.io.gcp.bigquery_file_loads._MAXIMUM_SOURCE_URIS", new=1) + def test_append_schema_change_with_temporary_tables(self, file_format): + """ + Test that schema update options are respected when appending to an + existing table via temporary tables with JSON or AVRO load files. + + _MAXIMUM_SOURCE_URIS and _DEFAULT_MAX_FILE_SIZE are both set to 1 to + force multiple load jobs and usage of temporary tables. + """ + + # Define table + table_name = 'python_append_schema_with_temp_tables' + table_id = '{}.{}'.format(self.dataset_id, table_name) + + table_schema = { + "fields": [{ + "name": "col_int64", "type": "INT64" + }, { + "name": "col_string", "type": "STRING" + }] + } + data = [{"col_int64": 3, "col_string": "beam"}] + + args = self.test_pipeline.get_full_options_as_args( + on_success_matcher=BigqueryFullResultMatcher( + project=self.project, + query=""" + SELECT col_int64, col_string + FROM {} + ORDER BY col_int64 + """.format(table_id), + data=[(3, "beam")])) + + with beam.Pipeline(argv=args) as p: + # pylint: disable=expression-not-assigned + p | beam.Create(data) | beam.io.WriteToBigQuery( + table=table_id, + schema=table_schema, + write_disposition=BigQueryDisposition.WRITE_APPEND, + temp_file_format=file_format) + + # Append new data with different schema + data = [{ + "col_string": "terra", "col_float64": 0.23 + }, { + "col_string": None, "col_float64": 0.64 + }] + updated_table_schema = { + "fields": [{ + "name": "col_int64", "type": "INT64" + }, { + "name": "col_string", "type": "STRING", "mode": "NULLABLE" + }, { + "name": "col_float64", "type": "FLOAT64", "mode": "NULLABLE" + }] + } + updated_args = self.test_pipeline.get_full_options_as_args( + on_success_matcher=BigqueryFullResultMatcher( + project=self.project, + query=""" + SELECT col_int64, col_string, col_float64 FROM `{}` + ORDER BY 2 + """.format(table_id), + data=[ + (None, "terra", 0.23), + (3, "beam", None), + (None, None, 0.64), + ])) + with beam.Pipeline(argv=updated_args) as p: + # pylint: disable=expression-not-assigned + ( + p + | beam.Create(data) + | beam.io.WriteToBigQuery( + table=table_id, + write_disposition=BigQueryDisposition.WRITE_APPEND, + schema=updated_table_schema, + additional_bq_parameters={ + 'schemaUpdateOptions': ['ALLOW_FIELD_ADDITION'] + }, + temp_file_format=file_format)) if __name__ == '__main__': From 3305599d8f735e113ad836753a0ce5913f6d254d Mon Sep 17 00:00:00 2001 From: Sayat Satybaldiyev Date: Tue, 7 Dec 2021 11:35:52 -0800 Subject: [PATCH 2/6] Change a desription --- sdks/python/apache_beam/io/gcp/bigquery_file_loads.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py index bf8565f46fbd..88410f1390bb 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py @@ -437,8 +437,8 @@ def process(self, element, schema_mod_job_name_prefix): create_disposition='CREATE_NEVER', additional_load_parameters=additional_parameters, job_labels=self._bq_io_metadata.add_additional_bq_job_labels(), - # JSON format is hardcoded because the zero rows load with a nested schema - # are permitted which is in contrast with the AVRO format. + # JSON format is hardcoded because zero rows load(unlike AVRO) and + # a nested schema(unlike CSV, which a default one) is permitted. source_format="NEWLINE_DELIMITED_JSON") yield (destination, schema_update_job_reference) From a4ce4444fe0a313bd1abbbaa8224178d6e15e7d5 Mon Sep 17 00:00:00 2001 From: Sayat Satybaldiyev Date: Tue, 7 Dec 2021 15:17:10 -0800 Subject: [PATCH 3/6] Fix pylint issue --- sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py index 7f263e6ec66f..268d47e04ba5 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py @@ -36,12 +36,12 @@ from parameterized import parameterized import apache_beam as beam +from apache_beam.io.gcp.bigquery import BigQueryDisposition from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper from apache_beam.io.gcp.bigquery_tools import FileFormat from apache_beam.io.gcp.internal.clients import bigquery from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryFullResultMatcher from apache_beam.testing.test_pipeline import TestPipeline -from apache_beam.io.gcp.bigquery import BigQueryDisposition # Protect against environments where bigquery library is not available. # pylint: disable=wrong-import-order, wrong-import-position From 7565f23a697ea19d10e48b4c13cf4dda55c4d10f Mon Sep 17 00:00:00 2001 From: Sayat Satybaldiyev Date: Wed, 15 Dec 2021 11:08:29 -0800 Subject: [PATCH 4/6] Import BigQueryDisposition class --- sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py index 268d47e04ba5..41d85b634a8b 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py @@ -36,7 +36,6 @@ from parameterized import parameterized import apache_beam as beam -from apache_beam.io.gcp.bigquery import BigQueryDisposition from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper from apache_beam.io.gcp.bigquery_tools import FileFormat from apache_beam.io.gcp.internal.clients import bigquery @@ -474,7 +473,7 @@ def test_append_schema_change_with_temporary_tables(self, file_format): p | beam.Create(data) | beam.io.WriteToBigQuery( table=table_id, schema=table_schema, - write_disposition=BigQueryDisposition.WRITE_APPEND, + write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, temp_file_format=file_format) # Append new data with different schema From 404eaf71a858be1989b67d96fc3e9868dbc19c5d Mon Sep 17 00:00:00 2001 From: Sayat Satybaldiyev Date: Wed, 15 Dec 2021 15:31:08 -0800 Subject: [PATCH 5/6] Combine both tets together --- .../io/gcp/bigquery_write_it_test.py | 179 ++++++------------ 1 file changed, 62 insertions(+), 117 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py index 41d85b634a8b..ec046a14bce9 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py @@ -86,6 +86,11 @@ def tearDown(self): def create_table(self, table_name): table_schema = bigquery.TableSchema() table_field = bigquery.TableFieldSchema() + table_field.name = 'int64' + table_field.type = 'INT64' + table_field.mode = 'REQUIRED' + table_schema.fields.append(table_field) + table_field = bigquery.TableFieldSchema() table_field.name = 'bytes' table_field.type = 'BYTES' table_schema.fields.append(table_field) @@ -299,16 +304,25 @@ def test_big_query_write_without_schema(self): table_id = '{}.{}'.format(self.dataset_id, table_name) input_data = [{ - 'bytes': b'xyw', 'date': '2011-01-01', 'time': '23:59:59.999999' - }, { - 'bytes': b'abc', 'date': '2000-01-01', 'time': '00:00:00' + 'int64': 1, + 'bytes': b'xyw', + 'date': '2011-01-01', + 'time': '23:59:59.999999' }, { + 'int64': 2, + 'bytes': b'abc', + 'date': '2000-01-01', + 'time': '00:00:00' + }, + { + 'int64': 3, 'bytes': b'\xe4\xbd\xa0\xe5\xa5\xbd', 'date': '3000-12-31', 'time': '23:59:59' }, { + 'int64': 4, 'bytes': b'\xab\xac\xad', 'date': '2000-01-01', 'time': '00:00:00' @@ -320,22 +334,27 @@ def test_big_query_write_without_schema(self): pipeline_verifiers = [ BigqueryFullResultMatcher( project=self.project, - query="SELECT bytes, date, time FROM %s" % table_id, + query="SELECT int64, bytes, date, time FROM %s" % table_id, data=[( + 1, b'xyw', datetime.date(2011, 1, 1), datetime.time(23, 59, 59, 999999), - ), ( - b'abc', - datetime.date(2000, 1, 1), - datetime.time(0, 0, 0), ), ( + 2, + b'abc', + datetime.date(2000, 1, 1), + datetime.time(0, 0, 0), + ), + ( + 3, b'\xe4\xbd\xa0\xe5\xa5\xbd', datetime.date(3000, 12, 31), datetime.time(23, 59, 59), ), ( + 4, b'\xab\xac\xad', datetime.date(2000, 1, 1), datetime.time(0, 0, 0), @@ -364,27 +383,28 @@ def test_big_query_write_without_schema(self): "apache_beam.io.gcp.bigquery_file_loads._MAXIMUM_SOURCE_URIS", new=1) def test_big_query_write_temp_table_append_schema_update(self, file_format): """ - Test that schema update options are respected when appending to an existing - table via temporary tables. + Test that nested schema update options and schema relaxation[1] are respected + when appending to an existing table via temporary tables. _MAXIMUM_SOURCE_URIS and max_file_size are both set to 1 to force multiple load jobs and usage of temporary tables. + + [1]: https://cloud.google.com/bigquery/docs/managing-table-schemas#changing_required_to_nullable_in_a_load_or_query_job """ table_name = 'python_append_schema_update' self.create_table(table_name) table_id = '{}.{}'.format(self.dataset_id, table_name) - input_data = [{ - "int64": num, "bool": True, "nested_field": [{ - "fruit": "Apple" - }] - } for num in range(1, 3)] - + # bytes, date, time fields are optional and omitted in the test + # only required and new columns are specified table_schema = { "fields": [{ - "name": "int64", "type": "INT64" + "name": "int64", + "type": "INT64", + "mode": "NULLABLE", }, { - "name": "bool", "type": "BOOL" + "name": "bool", + "type": "BOOL", }, { "name": "nested_field", @@ -399,18 +419,34 @@ def test_big_query_write_temp_table_append_schema_update(self, file_format): ] }] } - + input_data = [{ + "int64": 1, "bool": True, "nested_field": [{ + "fruit": "Apple" + }] + }, { + "bool": False, "nested_field": [{ + "fruit": "Mango" + }] + }, + { + "int64": None, + "bool": True, + "nested_field": [{ + "fruit": "Banana" + }] + }] args = self.test_pipeline.get_full_options_as_args( on_success_matcher=BigqueryFullResultMatcher( project=self.project, query=""" SELECT bytes, date, time, int64, bool, fruit - FROM %s, + FROM {}, UNNEST(nested_field) as nested_field - ORDER BY int64 - """ % table_id, - data=[(None, None, None, num, True, "Apple") - for num in range(1, 3)])) + ORDER BY fruit + """.format(table_id), + data=[(None, None, None, 1, True, + "Apple"), (None, None, None, None, True, "Banana"), ( + None, None, None, None, False, "Mango")])) with beam.Pipeline(argv=args) as p: # pylint: disable=expression-not-assigned @@ -419,102 +455,11 @@ def test_big_query_write_temp_table_append_schema_update(self, file_format): | 'write' >> beam.io.WriteToBigQuery( table_id, schema=table_schema, - write_disposition=BigQueryDisposition.WRITE_APPEND, + write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, max_file_size=1, # bytes method=beam.io.WriteToBigQuery.Method.FILE_LOADS, additional_bq_parameters={ - 'schemaUpdateOptions': ['ALLOW_FIELD_ADDITION']}, - temp_file_format=file_format)) - - @pytest.mark.it_postcommit - @parameterized.expand([ - param(file_format=FileFormat.AVRO), - param(file_format=FileFormat.JSON), - param(file_format=None), - ]) - @mock.patch( - "apache_beam.io.gcp.bigquery_file_loads._DEFAULT_MAX_FILE_SIZE", new=1) - @mock.patch( - "apache_beam.io.gcp.bigquery_file_loads._MAXIMUM_SOURCE_URIS", new=1) - def test_append_schema_change_with_temporary_tables(self, file_format): - """ - Test that schema update options are respected when appending to an - existing table via temporary tables with JSON or AVRO load files. - - _MAXIMUM_SOURCE_URIS and _DEFAULT_MAX_FILE_SIZE are both set to 1 to - force multiple load jobs and usage of temporary tables. - """ - - # Define table - table_name = 'python_append_schema_with_temp_tables' - table_id = '{}.{}'.format(self.dataset_id, table_name) - - table_schema = { - "fields": [{ - "name": "col_int64", "type": "INT64" - }, { - "name": "col_string", "type": "STRING" - }] - } - data = [{"col_int64": 3, "col_string": "beam"}] - - args = self.test_pipeline.get_full_options_as_args( - on_success_matcher=BigqueryFullResultMatcher( - project=self.project, - query=""" - SELECT col_int64, col_string - FROM {} - ORDER BY col_int64 - """.format(table_id), - data=[(3, "beam")])) - - with beam.Pipeline(argv=args) as p: - # pylint: disable=expression-not-assigned - p | beam.Create(data) | beam.io.WriteToBigQuery( - table=table_id, - schema=table_schema, - write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, - temp_file_format=file_format) - - # Append new data with different schema - data = [{ - "col_string": "terra", "col_float64": 0.23 - }, { - "col_string": None, "col_float64": 0.64 - }] - updated_table_schema = { - "fields": [{ - "name": "col_int64", "type": "INT64" - }, { - "name": "col_string", "type": "STRING", "mode": "NULLABLE" - }, { - "name": "col_float64", "type": "FLOAT64", "mode": "NULLABLE" - }] - } - updated_args = self.test_pipeline.get_full_options_as_args( - on_success_matcher=BigqueryFullResultMatcher( - project=self.project, - query=""" - SELECT col_int64, col_string, col_float64 FROM `{}` - ORDER BY 2 - """.format(table_id), - data=[ - (None, "terra", 0.23), - (3, "beam", None), - (None, None, 0.64), - ])) - with beam.Pipeline(argv=updated_args) as p: - # pylint: disable=expression-not-assigned - ( - p - | beam.Create(data) - | beam.io.WriteToBigQuery( - table=table_id, - write_disposition=BigQueryDisposition.WRITE_APPEND, - schema=updated_table_schema, - additional_bq_parameters={ - 'schemaUpdateOptions': ['ALLOW_FIELD_ADDITION'] - }, + 'schemaUpdateOptions': ['ALLOW_FIELD_ADDITION', 'ALLOW_FIELD_RELAXATION']}, temp_file_format=file_format)) From 1512b9894162ffe472950ab1fd9c454339900315 Mon Sep 17 00:00:00 2001 From: Sayat Satybaldiyev Date: Wed, 15 Dec 2021 15:44:37 -0800 Subject: [PATCH 6/6] Fix lint issues --- sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py index ec046a14bce9..dd2283eb71d6 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py @@ -383,13 +383,11 @@ def test_big_query_write_without_schema(self): "apache_beam.io.gcp.bigquery_file_loads._MAXIMUM_SOURCE_URIS", new=1) def test_big_query_write_temp_table_append_schema_update(self, file_format): """ - Test that nested schema update options and schema relaxation[1] are respected - when appending to an existing table via temporary tables. + Test that nested schema update options and schema relaxation + are respected when appending to an existing table via temporary tables. _MAXIMUM_SOURCE_URIS and max_file_size are both set to 1 to force multiple load jobs and usage of temporary tables. - - [1]: https://cloud.google.com/bigquery/docs/managing-table-schemas#changing_required_to_nullable_in_a_load_or_query_job """ table_name = 'python_append_schema_update' self.create_table(table_name) @@ -459,7 +457,8 @@ def test_big_query_write_temp_table_append_schema_update(self, file_format): max_file_size=1, # bytes method=beam.io.WriteToBigQuery.Method.FILE_LOADS, additional_bq_parameters={ - 'schemaUpdateOptions': ['ALLOW_FIELD_ADDITION', 'ALLOW_FIELD_RELAXATION']}, + 'schemaUpdateOptions': ['ALLOW_FIELD_ADDITION', + 'ALLOW_FIELD_RELAXATION']}, temp_file_format=file_format))