diff --git a/.github/trigger_files/beam_PostCommit_Python.json b/.github/trigger_files/beam_PostCommit_Python.json index 62905b12a707..023697768983 100644 --- a/.github/trigger_files/beam_PostCommit_Python.json +++ b/.github/trigger_files/beam_PostCommit_Python.json @@ -1,5 +1,5 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", - "modification": 13 + "modification": 12 } diff --git a/sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py b/sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py index 9f90a44d9a00..9aed0d5f11d5 100644 --- a/sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py +++ b/sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py @@ -36,6 +36,8 @@ from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to +from apache_beam.typehints.schemas import LogicalType +from apache_beam.typehints.schemas import MillisInstant from apache_beam.utils.timestamp import Timestamp # pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports @@ -253,6 +255,10 @@ def test_xlang_jdbc_write_read(self, database): classpath=config['classpath'], )) + # Register MillisInstant logical type to override the mapping from Timestamp + # originally handled by MicrosInstant. + LogicalType.register_logical_type(MillisInstant) + with TestPipeline() as p: p.not_use_test_runner_api = True result = ( @@ -349,6 +355,10 @@ def custom_row_equals(expected, actual): classpath=config['classpath'], )) + # Register MillisInstant logical type to override the mapping from Timestamp + # originally handled by MicrosInstant. + LogicalType.register_logical_type(MillisInstant) + # Run read pipeline with custom schema with TestPipeline() as p: p.not_use_test_runner_api = True diff --git a/sdks/python/apache_beam/io/jdbc.py b/sdks/python/apache_beam/io/jdbc.py index 604b95f6eebe..32ce16b358f8 100644 --- a/sdks/python/apache_beam/io/jdbc.py +++ b/sdks/python/apache_beam/io/jdbc.py @@ -401,7 +401,7 @@ def __init__(self, argument=""): @classmethod def representation_type(cls) -> type: - return MillisInstant + return Timestamp @classmethod def urn(cls): @@ -417,6 +417,7 @@ def to_representation_type(self, value: datetime.date) -> Timestamp: value, datetime.datetime.min.time(), tzinfo=datetime.timezone.utc)) def to_language_type(self, value: Timestamp) -> datetime.date: + return value.to_utc_datetime().date() @classmethod @@ -444,7 +445,7 @@ def __init__(self, argument=""): @classmethod def representation_type(cls) -> type: - return MillisInstant + return Timestamp @classmethod def urn(cls): @@ -462,6 +463,7 @@ def to_representation_type(self, value: datetime.date) -> Timestamp: tzinfo=datetime.timezone.utc)) def to_language_type(self, value: Timestamp) -> datetime.date: + return value.to_utc_datetime().time() @classmethod diff --git a/sdks/python/apache_beam/typehints/schemas.py b/sdks/python/apache_beam/typehints/schemas.py index 90a692e21125..de4cdb9fdf75 100644 --- a/sdks/python/apache_beam/typehints/schemas.py +++ b/sdks/python/apache_beam/typehints/schemas.py @@ -335,10 +335,7 @@ def typing_to_runner_api(self, type_: type) -> schema_pb2.FieldType: array_type=schema_pb2.ArrayType(element_type=element_type)) try: - if LogicalType.is_known_logical_type(type_): - logical_type = type_ - else: - logical_type = LogicalType.from_typing(type_) + logical_type = LogicalType.from_typing(type_) except ValueError: # Unknown type, just treat it like Any return schema_pb2.FieldType( @@ -672,7 +669,7 @@ def add(self, urn, logical_type): def get_logical_type_by_urn(self, urn): return self.by_urn.get(urn, None) - def get_urn_by_logical_type(self, logical_type): + def get_urn_by_logial_type(self, logical_type): return self.by_logical_type.get(logical_type, None) def get_logical_type_by_language_type(self, representation_type): @@ -811,11 +808,6 @@ def from_runner_api(cls, logical_type_proto): return logical_type() return logical_type(argument) - @classmethod - def is_known_logical_type(cls, logical_type): - return cls._known_logical_types.get_urn_by_logical_type( - logical_type) is not None - class NoArgumentLogicalType(LogicalType[LanguageT, RepresentationT, None]): @classmethod diff --git a/sdks/python/apache_beam/yaml/main.py b/sdks/python/apache_beam/yaml/main.py index fbebaea4346f..35879514ce15 100644 --- a/sdks/python/apache_beam/yaml/main.py +++ b/sdks/python/apache_beam/yaml/main.py @@ -16,6 +16,7 @@ # import argparse +import contextlib import json import os import sys @@ -26,6 +27,8 @@ import apache_beam as beam from apache_beam.io.filesystems import FileSystems from apache_beam.transforms import resources +from apache_beam.typehints.schemas import LogicalType +from apache_beam.typehints.schemas import MillisInstant from apache_beam.yaml import yaml_testing from apache_beam.yaml import yaml_transform from apache_beam.yaml import yaml_utils @@ -133,12 +136,25 @@ def _pipeline_spec_from_args(known_args): return pipeline_yaml +@contextlib.contextmanager +def _fix_xlang_instant_coding(): + # Scoped workaround for https://github.com/apache/beam/issues/28151. + old_registry = LogicalType._known_logical_types + LogicalType._known_logical_types = old_registry.copy() + try: + LogicalType.register_logical_type(MillisInstant) + yield + finally: + LogicalType._known_logical_types = old_registry + + def run(argv=None): options, constructor, display_data = build_pipeline_components_from_argv(argv) - with beam.Pipeline(options=options, display_data=display_data) as p: - print('Building pipeline...') - constructor(p) - print('Running pipeline...') + with _fix_xlang_instant_coding(): + with beam.Pipeline(options=options, display_data=display_data) as p: + print('Building pipeline...') + constructor(p) + print('Running pipeline...') def run_tests(argv=None, exit=True): @@ -169,13 +185,14 @@ def run_tests(argv=None, exit=True): "If you haven't added a set of tests yet, you can get started by " 'running your pipeline with the --create_test flag enabled.') - tests = [ - yaml_testing.YamlTestCase( - pipeline_spec, test_spec, options, known_args.fix_tests) - for test_spec in test_specs - ] - suite = unittest.TestSuite(tests) - result = unittest.TextTestRunner().run(suite) + with _fix_xlang_instant_coding(): + tests = [ + yaml_testing.YamlTestCase( + pipeline_spec, test_spec, options, known_args.fix_tests) + for test_spec in test_specs + ] + suite = unittest.TestSuite(tests) + result = unittest.TextTestRunner().run(suite) if known_args.fix_tests or known_args.create_test: update_tests(known_args, pipeline_yaml, pipeline_spec, options, tests)