From 8255e59bf18d46f187616e692f3c0dc66ec54c62 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Wed, 11 Jun 2025 10:45:38 -0400 Subject: [PATCH 1/4] Fix a logical type issue about JdbcDateType --- sdks/python/apache_beam/io/jdbc.py | 3 +-- sdks/python/apache_beam/typehints/schemas.py | 10 +++++++++- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/io/jdbc.py b/sdks/python/apache_beam/io/jdbc.py index 32ce16b358f8..90f35c554466 100644 --- a/sdks/python/apache_beam/io/jdbc.py +++ b/sdks/python/apache_beam/io/jdbc.py @@ -401,7 +401,7 @@ def __init__(self, argument=""): @classmethod def representation_type(cls) -> type: - return Timestamp + return MillisInstant @classmethod def urn(cls): @@ -417,7 +417,6 @@ def to_representation_type(self, value: datetime.date) -> Timestamp: value, datetime.datetime.min.time(), tzinfo=datetime.timezone.utc)) def to_language_type(self, value: Timestamp) -> datetime.date: - return value.to_utc_datetime().date() @classmethod diff --git a/sdks/python/apache_beam/typehints/schemas.py b/sdks/python/apache_beam/typehints/schemas.py index de4cdb9fdf75..b3bbc1bf8122 100644 --- a/sdks/python/apache_beam/typehints/schemas.py +++ b/sdks/python/apache_beam/typehints/schemas.py @@ -335,7 +335,10 @@ def typing_to_runner_api(self, type_: type) -> schema_pb2.FieldType: array_type=schema_pb2.ArrayType(element_type=element_type)) try: - logical_type = LogicalType.from_typing(type_) + if LogicalType.is_known_logical_type(type_): + logical_type = type_ + else: + logical_type = LogicalType.from_typing(type_) except ValueError: # Unknown type, just treat it like Any return schema_pb2.FieldType( @@ -808,6 +811,11 @@ def from_runner_api(cls, logical_type_proto): return logical_type() return logical_type(argument) + @classmethod + def is_known_logical_type(cls, logical_type): + return cls._known_logical_types.get_urn_by_logial_type( + logical_type) is not None + class NoArgumentLogicalType(LogicalType[LanguageT, RepresentationT, None]): @classmethod From f21e92ed469c1c1eb77fb3e1dd23b3488baa5499 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Wed, 11 Jun 2025 14:49:43 -0400 Subject: [PATCH 2/4] Fix typo and also fix the logical class for java time. --- sdks/python/apache_beam/io/jdbc.py | 3 +-- sdks/python/apache_beam/typehints/schemas.py | 4 ++-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/io/jdbc.py b/sdks/python/apache_beam/io/jdbc.py index 90f35c554466..604b95f6eebe 100644 --- a/sdks/python/apache_beam/io/jdbc.py +++ b/sdks/python/apache_beam/io/jdbc.py @@ -444,7 +444,7 @@ def __init__(self, argument=""): @classmethod def representation_type(cls) -> type: - return Timestamp + return MillisInstant @classmethod def urn(cls): @@ -462,7 +462,6 @@ def to_representation_type(self, value: datetime.date) -> Timestamp: tzinfo=datetime.timezone.utc)) def to_language_type(self, value: Timestamp) -> datetime.date: - return value.to_utc_datetime().time() @classmethod diff --git a/sdks/python/apache_beam/typehints/schemas.py b/sdks/python/apache_beam/typehints/schemas.py index b3bbc1bf8122..90a692e21125 100644 --- a/sdks/python/apache_beam/typehints/schemas.py +++ b/sdks/python/apache_beam/typehints/schemas.py @@ -672,7 +672,7 @@ def add(self, urn, logical_type): def get_logical_type_by_urn(self, urn): return self.by_urn.get(urn, None) - def get_urn_by_logial_type(self, logical_type): + def get_urn_by_logical_type(self, logical_type): return self.by_logical_type.get(logical_type, None) def get_logical_type_by_language_type(self, representation_type): @@ -813,7 +813,7 @@ def from_runner_api(cls, logical_type_proto): @classmethod def is_known_logical_type(cls, logical_type): - return cls._known_logical_types.get_urn_by_logial_type( + return cls._known_logical_types.get_urn_by_logical_type( logical_type) is not None From 9ffb25ea576843f18e4a7a169ca423667b68585b Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Wed, 11 Jun 2025 22:18:39 -0400 Subject: [PATCH 3/4] Get rid of the workaround on logical type registration. Trigger tests. --- .../trigger_files/beam_PostCommit_Python.json | 2 +- .../io/external/xlang_jdbcio_it_test.py | 8 ----- sdks/python/apache_beam/yaml/main.py | 36 ++++++------------- 3 files changed, 12 insertions(+), 34 deletions(-) diff --git a/.github/trigger_files/beam_PostCommit_Python.json b/.github/trigger_files/beam_PostCommit_Python.json index 023697768983..62905b12a707 100644 --- a/.github/trigger_files/beam_PostCommit_Python.json +++ b/.github/trigger_files/beam_PostCommit_Python.json @@ -1,5 +1,5 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", - "modification": 12 + "modification": 13 } diff --git a/sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py b/sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py index 9aed0d5f11d5..22051dde34cd 100644 --- a/sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py +++ b/sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py @@ -255,10 +255,6 @@ def test_xlang_jdbc_write_read(self, database): classpath=config['classpath'], )) - # Register MillisInstant logical type to override the mapping from Timestamp - # originally handled by MicrosInstant. - LogicalType.register_logical_type(MillisInstant) - with TestPipeline() as p: p.not_use_test_runner_api = True result = ( @@ -355,10 +351,6 @@ def custom_row_equals(expected, actual): classpath=config['classpath'], )) - # Register MillisInstant logical type to override the mapping from Timestamp - # originally handled by MicrosInstant. - LogicalType.register_logical_type(MillisInstant) - # Run read pipeline with custom schema with TestPipeline() as p: p.not_use_test_runner_api = True diff --git a/sdks/python/apache_beam/yaml/main.py b/sdks/python/apache_beam/yaml/main.py index 35879514ce15..af2cf2279398 100644 --- a/sdks/python/apache_beam/yaml/main.py +++ b/sdks/python/apache_beam/yaml/main.py @@ -136,25 +136,12 @@ def _pipeline_spec_from_args(known_args): return pipeline_yaml -@contextlib.contextmanager -def _fix_xlang_instant_coding(): - # Scoped workaround for https://github.com/apache/beam/issues/28151. - old_registry = LogicalType._known_logical_types - LogicalType._known_logical_types = old_registry.copy() - try: - LogicalType.register_logical_type(MillisInstant) - yield - finally: - LogicalType._known_logical_types = old_registry - - def run(argv=None): options, constructor, display_data = build_pipeline_components_from_argv(argv) - with _fix_xlang_instant_coding(): - with beam.Pipeline(options=options, display_data=display_data) as p: - print('Building pipeline...') - constructor(p) - print('Running pipeline...') + with beam.Pipeline(options=options, display_data=display_data) as p: + print('Building pipeline...') + constructor(p) + print('Running pipeline...') def run_tests(argv=None, exit=True): @@ -185,14 +172,13 @@ def run_tests(argv=None, exit=True): "If you haven't added a set of tests yet, you can get started by " 'running your pipeline with the --create_test flag enabled.') - with _fix_xlang_instant_coding(): - tests = [ - yaml_testing.YamlTestCase( - pipeline_spec, test_spec, options, known_args.fix_tests) - for test_spec in test_specs - ] - suite = unittest.TestSuite(tests) - result = unittest.TextTestRunner().run(suite) + tests = [ + yaml_testing.YamlTestCase( + pipeline_spec, test_spec, options, known_args.fix_tests) + for test_spec in test_specs + ] + suite = unittest.TestSuite(tests) + result = unittest.TextTestRunner().run(suite) if known_args.fix_tests or known_args.create_test: update_tests(known_args, pipeline_yaml, pipeline_spec, options, tests) From 50636f1c897df9c38fcf8e9ed308e5aba20ea10c Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Wed, 11 Jun 2025 22:50:32 -0400 Subject: [PATCH 4/4] Fix lints. --- sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py | 2 -- sdks/python/apache_beam/yaml/main.py | 3 --- 2 files changed, 5 deletions(-) diff --git a/sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py b/sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py index 22051dde34cd..9f90a44d9a00 100644 --- a/sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py +++ b/sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py @@ -36,8 +36,6 @@ from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to -from apache_beam.typehints.schemas import LogicalType -from apache_beam.typehints.schemas import MillisInstant from apache_beam.utils.timestamp import Timestamp # pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports diff --git a/sdks/python/apache_beam/yaml/main.py b/sdks/python/apache_beam/yaml/main.py index af2cf2279398..fbebaea4346f 100644 --- a/sdks/python/apache_beam/yaml/main.py +++ b/sdks/python/apache_beam/yaml/main.py @@ -16,7 +16,6 @@ # import argparse -import contextlib import json import os import sys @@ -27,8 +26,6 @@ import apache_beam as beam from apache_beam.io.filesystems import FileSystems from apache_beam.transforms import resources -from apache_beam.typehints.schemas import LogicalType -from apache_beam.typehints.schemas import MillisInstant from apache_beam.yaml import yaml_testing from apache_beam.yaml import yaml_transform from apache_beam.yaml import yaml_utils