From 19e140c25144fc4ba9e52250449c7b3af242e605 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Tue, 5 Aug 2025 15:18:24 -0400 Subject: [PATCH 1/2] Fix timestamp issue in a yaml pipeline that calls SQL transform --- sdks/python/apache_beam/typehints/schemas.py | 8 +++++++- sdks/python/apache_beam/yaml/tests/sql.yaml | 18 ++++++++++++++++++ 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/typehints/schemas.py b/sdks/python/apache_beam/typehints/schemas.py index a3d9e4d8bf73..3cc89d0f2fea 100644 --- a/sdks/python/apache_beam/typehints/schemas.py +++ b/sdks/python/apache_beam/typehints/schemas.py @@ -151,11 +151,17 @@ def named_fields_to_schema( if isinstance(names_and_types, dict): names_and_types = names_and_types.items() + _ , cached_schema = schema_registry.by_id.get(schema_id, (None, None)) + if cached_schema: + type_by_name_from_schema = {field.name: field.type for field in cached_schema.fields} + else: + type_by_name_from_schema = {} + schema = schema_pb2.Schema( fields=[ schema_pb2.Field( name=name, - type=typing_to_runner_api(type), + type=type_by_name_from_schema.get(name, typing_to_runner_api(type)), options=[ option_to_runner_api(option_tuple) for option_tuple in field_options.get(name, []) diff --git a/sdks/python/apache_beam/yaml/tests/sql.yaml b/sdks/python/apache_beam/yaml/tests/sql.yaml index afa9e834fe93..0040a2790c54 100644 --- a/sdks/python/apache_beam/yaml/tests/sql.yaml +++ b/sdks/python/apache_beam/yaml/tests/sql.yaml @@ -75,3 +75,21 @@ pipelines: - {a: "x", s: "2"} - {a: "x", s: "3"} - {a: "y", s: "10"} + + - pipeline: + type: chain + transforms: + - type: Create + name: CreateSampleData + config: + elements: + - { id: 1, name: "John" } + - { id: 2, name: "Jane" } + - type: Sql + name: sql + config: + query: > + SELECT *, CURRENT_TIMESTAMP AS ingest_timestamp FROM PCOLLECTION + - type: PyTransform + config: + constructor: apache_beam.transforms.util.LogElements From 1a258821b2340575d0e92a3d1ad5655e24b7ca1d Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Tue, 5 Aug 2025 15:23:44 -0400 Subject: [PATCH 2/2] Apply yapf --- sdks/python/apache_beam/typehints/schemas.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/typehints/schemas.py b/sdks/python/apache_beam/typehints/schemas.py index 3cc89d0f2fea..32dc2fd06ece 100644 --- a/sdks/python/apache_beam/typehints/schemas.py +++ b/sdks/python/apache_beam/typehints/schemas.py @@ -151,9 +151,12 @@ def named_fields_to_schema( if isinstance(names_and_types, dict): names_and_types = names_and_types.items() - _ , cached_schema = schema_registry.by_id.get(schema_id, (None, None)) + _, cached_schema = schema_registry.by_id.get(schema_id, (None, None)) if cached_schema: - type_by_name_from_schema = {field.name: field.type for field in cached_schema.fields} + type_by_name_from_schema = { + field.name: field.type + for field in cached_schema.fields + } else: type_by_name_from_schema = {} @@ -161,7 +164,8 @@ def named_fields_to_schema( fields=[ schema_pb2.Field( name=name, - type=type_by_name_from_schema.get(name, typing_to_runner_api(type)), + type=type_by_name_from_schema.get( + name, typing_to_runner_api(type)), options=[ option_to_runner_api(option_tuple) for option_tuple in field_options.get(name, [])