diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py index e211ab4ebf8d..57204d4d5e61 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py @@ -344,25 +344,16 @@ def __init__( write_disposition=None, test_client=None, additional_bq_parameters=None, - step_name=None, - source_format=None): + step_name=None): self._test_client = test_client self._write_disposition = write_disposition self._additional_bq_parameters = additional_bq_parameters or {} self._step_name = step_name - self._source_format = source_format def setup(self): self._bq_wrapper = bigquery_tools.BigQueryWrapper(client=self._test_client) self._bq_io_metadata = create_bigquery_io_metadata(self._step_name) - def display_data(self): - return { - 'write_disposition': str(self._write_disposition), - 'additional_bq_params': str(self._additional_bq_parameters), - 'source_format': str(self._source_format), - } - def process(self, element, schema_mod_job_name_prefix): destination = element[0] temp_table_load_job_reference = element[1] @@ -424,7 +415,7 @@ def process(self, element, schema_mod_job_name_prefix): uid = _bq_uuid() job_name = '%s_%s_%s' % (schema_mod_job_name_prefix, destination_hash, uid) - _LOGGER.info( + _LOGGER.debug( 'Triggering schema modification job %s on %s', job_name, table_reference) @@ -438,8 +429,7 @@ def process(self, element, schema_mod_job_name_prefix): write_disposition='WRITE_APPEND', create_disposition='CREATE_NEVER', additional_load_parameters=additional_parameters, - job_labels=self._bq_io_metadata.add_additional_bq_job_labels(), - source_format=self._source_format) + job_labels=self._bq_io_metadata.add_additional_bq_job_labels()) yield (destination, schema_update_job_reference) @@ -583,8 +573,7 @@ def display_data(self): 'additional_bq_params': str(self.additional_bq_parameters), 'schema': str(self.schema), 'launchesBigQueryJobs': DisplayDataItem( - True, label="This Dataflow job launches bigquery jobs."), - 'source_format': str(self.source_format), + True, label="This Dataflow job launches bigquery jobs.") } return result @@ -630,7 +619,8 @@ def process(self, element, load_job_name_prefix, *schema_side_inputs): table_reference.tableId)) uid = _bq_uuid() job_name = '%s_%s_%s' % (load_job_name_prefix, destination_hash, uid) - _LOGGER.info('Load job has %s files. Job name is %s.', len(files), job_name) + _LOGGER.debug( + 'Load job has %s files. Job name is %s.', len(files), job_name) create_disposition = self.create_disposition if self.temporary_tables: @@ -645,13 +635,11 @@ def process(self, element, load_job_name_prefix, *schema_side_inputs): _LOGGER.info( 'Triggering job %s to load data to BigQuery table %s.' - 'Schema: %s. Additional parameters: %s. Source format: %s', + 'Schema: %s. Additional parameters: %s', job_name, table_reference, schema, - additional_parameters, - self.source_format, - ) + additional_parameters) if not self.bq_io_metadata: self.bq_io_metadata = create_bigquery_io_metadata(self._step_name) job_reference = self.bq_wrapper.perform_load_job( @@ -1027,9 +1015,7 @@ def _load_data( write_disposition=self.write_disposition, test_client=self.test_client, additional_bq_parameters=self.additional_bq_parameters, - step_name=step_name, - source_format=self._temp_file_format, - ), + step_name=step_name), schema_mod_job_name_pcv)) finished_schema_mod_jobs_pc = ( diff --git a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py index 3e0c641fb168..c7f1d442057a 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py @@ -367,40 +367,21 @@ def test_big_query_write_temp_table_append_schema_update(self): self.create_table(table_name) table_id = '{}.{}'.format(self.dataset_id, table_name) - input_data = [{ - "int64": num, "bool": True, "nested_field": { - "fruit": "Apple" - } - } for num in range(1, 3)] + input_data = [{"int64": 1, "bool": True}, {"int64": 2, "bool": False}] table_schema = { "fields": [{ "name": "int64", "type": "INT64" }, { "name": "bool", "type": "BOOL" - }, - { - "name": "nested_field", - "type": "RECORD", - "mode": "REPEATED", - "fields": [ - { - "name": "fruit", - "type": "STRING", - "mode": "NULLABLE" - }, - ] - }] + }] } args = self.test_pipeline.get_full_options_as_args( on_success_matcher=BigqueryFullResultMatcher( project=self.project, - query= - "SELECT bytes, date, time, int64, bool, nested_field.fruit FROM %s" - % table_id, - data=[(None, None, None, num, True, "Apple") - for num in range(1, 3)])) + query="SELECT bytes, date, time, int64, bool FROM %s" % table_id, + data=[(None, None, None, 1, True), (None, None, None, 2, False)])) with beam.Pipeline(argv=args) as p: # pylint: disable=expression-not-assigned