diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index 7310bbdc9fb6..0ee0822fd300 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -2762,11 +2762,22 @@ def expand(self, input): failed_rows=failed_rows, failed_rows_with_errors=failed_rows_with_errors) + class ConvertToBeamRowsSetupSchema: + def __init__(self, schema): + self._value = schema + + def __enter__(self): + if not isinstance(self._value, + (bigquery.TableSchema, bigquery.TableFieldSchema)): + return bigquery_tools.get_bq_tableschema(self._value) + + return self._value + + def __exit__(self, *args): + pass + class ConvertToBeamRows(PTransform): def __init__(self, schema, dynamic_destinations): - if not isinstance(schema, - (bigquery.TableSchema, bigquery.TableFieldSchema)): - schema = bigquery_tools.get_bq_tableschema(schema) self.schema = schema self.dynamic_destinations = dynamic_destinations @@ -2775,18 +2786,22 @@ def expand(self, input_dicts): return ( input_dicts | "Convert dict to Beam Row" >> beam.Map( - lambda row: beam.Row( - **{ - StorageWriteToBigQuery.DESTINATION: row[ - 0], StorageWriteToBigQuery.RECORD: bigquery_tools. - beam_row_from_dict(row[1], self.schema) - }))) + lambda row, schema=DoFn.SetupContextParam( + StorageWriteToBigQuery.ConvertToBeamRowsSetupSchema, args= + [self.schema]): beam.Row( + **{ + StorageWriteToBigQuery.DESTINATION: row[0], + StorageWriteToBigQuery.RECORD: bigquery_tools. + beam_row_from_dict(row[1], schema) + }))) else: return ( input_dicts | "Convert dict to Beam Row" >> beam.Map( - lambda row: bigquery_tools.beam_row_from_dict(row, self.schema)) - ) + lambda row, schema=DoFn.SetupContextParam( + StorageWriteToBigQuery.ConvertToBeamRowsSetupSchema, args=[ + self.schema + ]): bigquery_tools.beam_row_from_dict(row, schema))) def with_output_types(self): row_type_hints = bigquery_tools.get_beam_typehints_from_tableschema(