diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py index 3db8ce4c26ed..ddc8fe61db04 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py @@ -335,7 +335,8 @@ class UpdateDestinationSchema(beam.DoFn): This transform emits (destination, job_reference) pairs where the job_reference refers to a submitted load job for performing the schema - modification. Note that the input and output job references are not the same. + modification in JSON format. Note that the input and output job references + are not the same. Experimental; no backwards compatibility guarantees. """ @@ -345,13 +346,11 @@ def __init__( test_client=None, additional_bq_parameters=None, step_name=None, - source_format=None, load_job_project_id=None): self._test_client = test_client self._write_disposition = write_disposition self._additional_bq_parameters = additional_bq_parameters or {} self._step_name = step_name - self._source_format = source_format self._load_job_project_id = load_job_project_id def setup(self): @@ -362,7 +361,6 @@ def display_data(self): return { 'write_disposition': str(self._write_disposition), 'additional_bq_params': str(self._additional_bq_parameters), - 'source_format': str(self._source_format), } def process(self, element, schema_mod_job_name_prefix): @@ -441,7 +439,9 @@ def process(self, element, schema_mod_job_name_prefix): create_disposition='CREATE_NEVER', additional_load_parameters=additional_parameters, job_labels=self._bq_io_metadata.add_additional_bq_job_labels(), - source_format=self._source_format, + # JSON format is hardcoded because zero rows load(unlike AVRO) and + # a nested schema(unlike CSV, which a default one) is permitted. + source_format="NEWLINE_DELIMITED_JSON", load_job_project_id=self._load_job_project_id) yield (destination, schema_update_job_reference) @@ -1043,7 +1043,6 @@ def _load_data( test_client=self.test_client, additional_bq_parameters=self.additional_bq_parameters, step_name=step_name, - source_format=self._temp_file_format, load_job_project_id=self.load_job_project_id), schema_mod_job_name_pcv)) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py index 43bc7c9fa1ed..dd2283eb71d6 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py @@ -32,6 +32,8 @@ import mock import pytest import pytz +from parameterized import param +from parameterized import parameterized import apache_beam as beam from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper @@ -84,6 +86,11 @@ def tearDown(self): def create_table(self, table_name): table_schema = bigquery.TableSchema() table_field = bigquery.TableFieldSchema() + table_field.name = 'int64' + table_field.type = 'INT64' + table_field.mode = 'REQUIRED' + table_schema.fields.append(table_field) + table_field = bigquery.TableFieldSchema() table_field.name = 'bytes' table_field.type = 'BYTES' table_schema.fields.append(table_field) @@ -297,16 +304,25 @@ def test_big_query_write_without_schema(self): table_id = '{}.{}'.format(self.dataset_id, table_name) input_data = [{ - 'bytes': b'xyw', 'date': '2011-01-01', 'time': '23:59:59.999999' - }, { - 'bytes': b'abc', 'date': '2000-01-01', 'time': '00:00:00' + 'int64': 1, + 'bytes': b'xyw', + 'date': '2011-01-01', + 'time': '23:59:59.999999' }, { + 'int64': 2, + 'bytes': b'abc', + 'date': '2000-01-01', + 'time': '00:00:00' + }, + { + 'int64': 3, 'bytes': b'\xe4\xbd\xa0\xe5\xa5\xbd', 'date': '3000-12-31', 'time': '23:59:59' }, { + 'int64': 4, 'bytes': b'\xab\xac\xad', 'date': '2000-01-01', 'time': '00:00:00' @@ -318,22 +334,27 @@ def test_big_query_write_without_schema(self): pipeline_verifiers = [ BigqueryFullResultMatcher( project=self.project, - query="SELECT bytes, date, time FROM %s" % table_id, + query="SELECT int64, bytes, date, time FROM %s" % table_id, data=[( + 1, b'xyw', datetime.date(2011, 1, 1), datetime.time(23, 59, 59, 999999), - ), ( - b'abc', - datetime.date(2000, 1, 1), - datetime.time(0, 0, 0), ), ( + 2, + b'abc', + datetime.date(2000, 1, 1), + datetime.time(0, 0, 0), + ), + ( + 3, b'\xe4\xbd\xa0\xe5\xa5\xbd', datetime.date(3000, 12, 31), datetime.time(23, 59, 59), ), ( + 4, b'\xab\xac\xad', datetime.date(2000, 1, 1), datetime.time(0, 0, 0), @@ -353,12 +374,17 @@ def test_big_query_write_without_schema(self): temp_file_format=FileFormat.JSON)) @pytest.mark.it_postcommit + @parameterized.expand([ + param(file_format=FileFormat.AVRO), + param(file_format=FileFormat.JSON), + param(file_format=None), + ]) @mock.patch( "apache_beam.io.gcp.bigquery_file_loads._MAXIMUM_SOURCE_URIS", new=1) - def test_big_query_write_temp_table_append_schema_update(self): + def test_big_query_write_temp_table_append_schema_update(self, file_format): """ - Test that schema update options are respected when appending to an existing - table via temporary tables. + Test that nested schema update options and schema relaxation + are respected when appending to an existing table via temporary tables. _MAXIMUM_SOURCE_URIS and max_file_size are both set to 1 to force multiple load jobs and usage of temporary tables. @@ -367,17 +393,16 @@ def test_big_query_write_temp_table_append_schema_update(self): self.create_table(table_name) table_id = '{}.{}'.format(self.dataset_id, table_name) - input_data = [{ - "int64": num, "bool": True, "nested_field": { - "fruit": "Apple" - } - } for num in range(1, 3)] - + # bytes, date, time fields are optional and omitted in the test + # only required and new columns are specified table_schema = { "fields": [{ - "name": "int64", "type": "INT64" + "name": "int64", + "type": "INT64", + "mode": "NULLABLE", }, { - "name": "bool", "type": "BOOL" + "name": "bool", + "type": "BOOL", }, { "name": "nested_field", @@ -392,18 +417,34 @@ def test_big_query_write_temp_table_append_schema_update(self): ] }] } - + input_data = [{ + "int64": 1, "bool": True, "nested_field": [{ + "fruit": "Apple" + }] + }, { + "bool": False, "nested_field": [{ + "fruit": "Mango" + }] + }, + { + "int64": None, + "bool": True, + "nested_field": [{ + "fruit": "Banana" + }] + }] args = self.test_pipeline.get_full_options_as_args( on_success_matcher=BigqueryFullResultMatcher( project=self.project, query=""" SELECT bytes, date, time, int64, bool, fruit - FROM %s, + FROM {}, UNNEST(nested_field) as nested_field - ORDER BY int64 - """ % table_id, - data=[(None, None, None, num, True, "Apple") - for num in range(1, 3)])) + ORDER BY fruit + """.format(table_id), + data=[(None, None, None, 1, True, + "Apple"), (None, None, None, None, True, "Banana"), ( + None, None, None, None, False, "Mango")])) with beam.Pipeline(argv=args) as p: # pylint: disable=expression-not-assigned @@ -416,7 +457,9 @@ def test_big_query_write_temp_table_append_schema_update(self): max_file_size=1, # bytes method=beam.io.WriteToBigQuery.Method.FILE_LOADS, additional_bq_parameters={ - 'schemaUpdateOptions': ['ALLOW_FIELD_ADDITION']})) + 'schemaUpdateOptions': ['ALLOW_FIELD_ADDITION', + 'ALLOW_FIELD_RELAXATION']}, + temp_file_format=file_format)) if __name__ == '__main__':