From 9d3a354f48032e237bdc67b49761e3ee612fd5d1 Mon Sep 17 00:00:00 2001 From: Chuck Yang Date: Sat, 27 Feb 2021 00:04:58 -0800 Subject: [PATCH] Pass str rather than TableReference --- .../apache_beam/io/gcp/bigquery_file_loads.py | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py index 044988010fa3..298ead6b107e 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py @@ -32,7 +32,6 @@ import hashlib import logging -import pickle import random import time import uuid @@ -256,7 +255,7 @@ def process(self, element, file_prefix, *schema_side_inputs): self._destination_to_file_writer.pop(destination) yield pvalue.TaggedOutput( WriteRecordsToFile.WRITTEN_FILE_TAG, - (element[0], (file_path, file_size))) + (destination, (file_path, file_size))) def finish_bundle(self): for destination, file_path_writer in \ @@ -287,7 +286,7 @@ def __init__( self.file_format = file_format or bigquery_tools.FileFormat.JSON def process(self, element, file_prefix, *schema_side_inputs): - destination = element[0] + destination = bigquery_tools.get_hashable_destination(element[0]) rows = element[1] file_path, writer = None, None @@ -509,7 +508,9 @@ def process(self, element, load_job_name_prefix, *schema_side_inputs): create_disposition = 'CREATE_IF_NEEDED' # For temporary tables, we create a new table with the name with JobId. table_reference.tableId = job_name - yield pvalue.TaggedOutput(TriggerLoadJobs.TEMP_TABLES, table_reference) + yield pvalue.TaggedOutput( + TriggerLoadJobs.TEMP_TABLES, + bigquery_tools.get_hashable_destination(table_reference)) _LOGGER.info( 'Triggering job %s to load data to BigQuery table %s.' @@ -904,13 +905,9 @@ def _load_data( lambda x, deleting_tables: deleting_tables, pvalue.AsIter(temp_tables_pc)) - # TableReference has no deterministic coder, but as this de-duplication - # is best-effort, pickling should be good enough. - | "RemoveTempTables/AddUselessValue" >> - beam.Map(lambda x: (pickle.dumps(x), None)) + | "RemoveTempTables/AddUselessValue" >> beam.Map(lambda x: (x, None)) | "RemoveTempTables/DeduplicateTables" >> beam.GroupByKey() - | "RemoveTempTables/GetTableNames" >> - beam.MapTuple(lambda k, nones: pickle.loads(k)) + | "RemoveTempTables/GetTableNames" >> beam.Keys() | "RemoveTempTables/Delete" >> beam.ParDo( DeleteTablesFn(self.test_client)))