From 10dcf3eba3562896459efa6dd7c6110c0b4b37ec Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Tue, 14 Oct 2025 14:34:06 +0200 Subject: [PATCH 1/5] add setup to convert row Map transform --- sdks/python/apache_beam/io/gcp/bigquery.py | 29 ++++++++++++++++------ 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index 7310bbdc9fb6..c6f489b855ef 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -365,6 +365,7 @@ def chain_after(result): import time import uuid import warnings +from contextlib import contextmanager from dataclasses import dataclass from typing import Dict from typing import List @@ -2770,23 +2771,35 @@ def __init__(self, schema, dynamic_destinations): self.schema = schema self.dynamic_destinations = dynamic_destinations + class ConvertToBeamRowsSetupSchema: + def __init__(self, value): + self._value = value + + def __enter__(self): + if not isinstance(self._value, + (bigquery.TableSchema, bigquery.TableFieldSchema)): + yield bigquery_tools.get_bq_tableschema(self._value) + yield self._value + def expand(self, input_dicts): if self.dynamic_destinations: return ( input_dicts | "Convert dict to Beam Row" >> beam.Map( - lambda row: beam.Row( - **{ - StorageWriteToBigQuery.DESTINATION: row[ - 0], StorageWriteToBigQuery.RECORD: bigquery_tools. - beam_row_from_dict(row[1], self.schema) - }))) + lambda row, schema=DoFn.SetupContextParam( + ConvertToBeamRowsSetupSchema, args=(self.schema)): beam.Row( + **{ + StorageWriteToBigQuery.DESTINATION: row[0], + StorageWriteToBigQuery.RECORD: bigquery_tools. + beam_row_from_dict(row[1], schema) + }))) else: return ( input_dicts | "Convert dict to Beam Row" >> beam.Map( - lambda row: bigquery_tools.beam_row_from_dict(row, self.schema)) - ) + lambda row, schema=DoFn.SetupContextParam( + ConvertToBeamRowsSetupSchema, args=(self.schema)): + bigquery_tools.beam_row_from_dict(row, schema))) def with_output_types(self): row_type_hints = bigquery_tools.get_beam_typehints_from_tableschema( From fa0a14fdd04bd92912dedeec2a1762aed69faeb2 Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Tue, 14 Oct 2025 14:35:18 +0200 Subject: [PATCH 2/5] unused import --- sdks/python/apache_beam/io/gcp/bigquery.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index c6f489b855ef..938ff2257f95 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -365,7 +365,6 @@ def chain_after(result): import time import uuid import warnings -from contextlib import contextmanager from dataclasses import dataclass from typing import Dict from typing import List From b13f60e9c5ea742d68d7c5f687fdbf02f486db27 Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Tue, 14 Oct 2025 15:14:52 +0200 Subject: [PATCH 3/5] lint --- sdks/python/apache_beam/io/gcp/bigquery.py | 31 +++++++++++----------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index 938ff2257f95..9694b5c4c23e 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -2762,31 +2762,29 @@ def expand(self, input): failed_rows=failed_rows, failed_rows_with_errors=failed_rows_with_errors) + class ConvertToBeamRowsSetupSchema: + def __init__(self, value): + self._value = value + + def __enter__(self): + if not isinstance(self._value, + (bigquery.TableSchema, bigquery.TableFieldSchema)): + yield bigquery_tools.get_bq_tableschema(self._value) + yield self._value + class ConvertToBeamRows(PTransform): def __init__(self, schema, dynamic_destinations): - if not isinstance(schema, - (bigquery.TableSchema, bigquery.TableFieldSchema)): - schema = bigquery_tools.get_bq_tableschema(schema) self.schema = schema self.dynamic_destinations = dynamic_destinations - class ConvertToBeamRowsSetupSchema: - def __init__(self, value): - self._value = value - - def __enter__(self): - if not isinstance(self._value, - (bigquery.TableSchema, bigquery.TableFieldSchema)): - yield bigquery_tools.get_bq_tableschema(self._value) - yield self._value - def expand(self, input_dicts): if self.dynamic_destinations: return ( input_dicts | "Convert dict to Beam Row" >> beam.Map( lambda row, schema=DoFn.SetupContextParam( - ConvertToBeamRowsSetupSchema, args=(self.schema)): beam.Row( + StorageWriteToBigQuery.ConvertToBeamRowsSetupSchema, args= + (self.schema)): beam.Row( **{ StorageWriteToBigQuery.DESTINATION: row[0], StorageWriteToBigQuery.RECORD: bigquery_tools. @@ -2797,8 +2795,9 @@ def expand(self, input_dicts): input_dicts | "Convert dict to Beam Row" >> beam.Map( lambda row, schema=DoFn.SetupContextParam( - ConvertToBeamRowsSetupSchema, args=(self.schema)): - bigquery_tools.beam_row_from_dict(row, schema))) + StorageWriteToBigQuery.ConvertToBeamRowsSetupSchema, args= + (self.schema)): bigquery_tools.beam_row_from_dict( + row, schema))) def with_output_types(self): row_type_hints = bigquery_tools.get_beam_typehints_from_tableschema( From 1c86a8c573708af243257989285032363b4e5191 Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Tue, 14 Oct 2025 19:24:10 +0200 Subject: [PATCH 4/5] add SetupContextParam --- sdks/python/apache_beam/io/gcp/bigquery.py | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index 9694b5c4c23e..c16c5ba95ab6 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -2763,14 +2763,19 @@ def expand(self, input): failed_rows_with_errors=failed_rows_with_errors) class ConvertToBeamRowsSetupSchema: - def __init__(self, value): - self._value = value + def __init__(self, schema): + self._value = schema def __enter__(self): if not isinstance(self._value, (bigquery.TableSchema, bigquery.TableFieldSchema)): - yield bigquery_tools.get_bq_tableschema(self._value) - yield self._value + + s = bigquery_tools.get_bq_tableschema(self._value) + return s + return self._value + + def __exit__(self, *args): + pass class ConvertToBeamRows(PTransform): def __init__(self, schema, dynamic_destinations): @@ -2784,7 +2789,7 @@ def expand(self, input_dicts): | "Convert dict to Beam Row" >> beam.Map( lambda row, schema=DoFn.SetupContextParam( StorageWriteToBigQuery.ConvertToBeamRowsSetupSchema, args= - (self.schema)): beam.Row( + [self.schema]): beam.Row( **{ StorageWriteToBigQuery.DESTINATION: row[0], StorageWriteToBigQuery.RECORD: bigquery_tools. @@ -2795,9 +2800,9 @@ def expand(self, input_dicts): input_dicts | "Convert dict to Beam Row" >> beam.Map( lambda row, schema=DoFn.SetupContextParam( - StorageWriteToBigQuery.ConvertToBeamRowsSetupSchema, args= - (self.schema)): bigquery_tools.beam_row_from_dict( - row, schema))) + StorageWriteToBigQuery.ConvertToBeamRowsSetupSchema, args=[ + self.schema + ]): bigquery_tools.beam_row_from_dict(row, schema))) def with_output_types(self): row_type_hints = bigquery_tools.get_beam_typehints_from_tableschema( From 4b7a541c30fedf1feca888efa9819673451c4d96 Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Tue, 14 Oct 2025 19:25:33 +0200 Subject: [PATCH 5/5] lint --- sdks/python/apache_beam/io/gcp/bigquery.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index c16c5ba95ab6..0ee0822fd300 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -2769,9 +2769,8 @@ def __init__(self, schema): def __enter__(self): if not isinstance(self._value, (bigquery.TableSchema, bigquery.TableFieldSchema)): + return bigquery_tools.get_bq_tableschema(self._value) - s = bigquery_tools.get_bq_tableschema(self._value) - return s return self._value def __exit__(self, *args):