Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/trigger_files/beam_PostCommit_Python.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run.",
"modification": 13
"modification": 12
}

10 changes: 10 additions & 0 deletions sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
from apache_beam.typehints.schemas import LogicalType
from apache_beam.typehints.schemas import MillisInstant
from apache_beam.utils.timestamp import Timestamp

# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
Expand Down Expand Up @@ -253,6 +255,10 @@ def test_xlang_jdbc_write_read(self, database):
classpath=config['classpath'],
))

# Register MillisInstant logical type to override the mapping from Timestamp
# originally handled by MicrosInstant.
LogicalType.register_logical_type(MillisInstant)

with TestPipeline() as p:
p.not_use_test_runner_api = True
result = (
Expand Down Expand Up @@ -349,6 +355,10 @@ def custom_row_equals(expected, actual):
classpath=config['classpath'],
))

# Register MillisInstant logical type to override the mapping from Timestamp
# originally handled by MicrosInstant.
LogicalType.register_logical_type(MillisInstant)

# Run read pipeline with custom schema
with TestPipeline() as p:
p.not_use_test_runner_api = True
Expand Down
6 changes: 4 additions & 2 deletions sdks/python/apache_beam/io/jdbc.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ def __init__(self, argument=""):

@classmethod
def representation_type(cls) -> type:
return MillisInstant
return Timestamp

@classmethod
def urn(cls):
Expand All @@ -417,6 +417,7 @@ def to_representation_type(self, value: datetime.date) -> Timestamp:
value, datetime.datetime.min.time(), tzinfo=datetime.timezone.utc))

def to_language_type(self, value: Timestamp) -> datetime.date:

return value.to_utc_datetime().date()

@classmethod
Expand Down Expand Up @@ -444,7 +445,7 @@ def __init__(self, argument=""):

@classmethod
def representation_type(cls) -> type:
return MillisInstant
return Timestamp

@classmethod
def urn(cls):
Expand All @@ -462,6 +463,7 @@ def to_representation_type(self, value: datetime.date) -> Timestamp:
tzinfo=datetime.timezone.utc))

def to_language_type(self, value: Timestamp) -> datetime.date:

return value.to_utc_datetime().time()

@classmethod
Expand Down
12 changes: 2 additions & 10 deletions sdks/python/apache_beam/typehints/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,10 +335,7 @@ def typing_to_runner_api(self, type_: type) -> schema_pb2.FieldType:
array_type=schema_pb2.ArrayType(element_type=element_type))

try:
if LogicalType.is_known_logical_type(type_):
logical_type = type_
else:
logical_type = LogicalType.from_typing(type_)
logical_type = LogicalType.from_typing(type_)
except ValueError:
# Unknown type, just treat it like Any
return schema_pb2.FieldType(
Expand Down Expand Up @@ -672,7 +669,7 @@ def add(self, urn, logical_type):
def get_logical_type_by_urn(self, urn):
return self.by_urn.get(urn, None)

def get_urn_by_logical_type(self, logical_type):
def get_urn_by_logial_type(self, logical_type):
return self.by_logical_type.get(logical_type, None)

def get_logical_type_by_language_type(self, representation_type):
Expand Down Expand Up @@ -811,11 +808,6 @@ def from_runner_api(cls, logical_type_proto):
return logical_type()
return logical_type(argument)

@classmethod
def is_known_logical_type(cls, logical_type):
return cls._known_logical_types.get_urn_by_logical_type(
logical_type) is not None


class NoArgumentLogicalType(LogicalType[LanguageT, RepresentationT, None]):
@classmethod
Expand Down
39 changes: 28 additions & 11 deletions sdks/python/apache_beam/yaml/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#

import argparse
import contextlib
import json
import os
import sys
Expand All @@ -26,6 +27,8 @@
import apache_beam as beam
from apache_beam.io.filesystems import FileSystems
from apache_beam.transforms import resources
from apache_beam.typehints.schemas import LogicalType
from apache_beam.typehints.schemas import MillisInstant
from apache_beam.yaml import yaml_testing
from apache_beam.yaml import yaml_transform
from apache_beam.yaml import yaml_utils
Expand Down Expand Up @@ -133,12 +136,25 @@ def _pipeline_spec_from_args(known_args):
return pipeline_yaml


@contextlib.contextmanager
def _fix_xlang_instant_coding():
# Scoped workaround for https://github.com/apache/beam/issues/28151.
old_registry = LogicalType._known_logical_types
LogicalType._known_logical_types = old_registry.copy()
try:
LogicalType.register_logical_type(MillisInstant)
yield
finally:
LogicalType._known_logical_types = old_registry


def run(argv=None):
options, constructor, display_data = build_pipeline_components_from_argv(argv)
with beam.Pipeline(options=options, display_data=display_data) as p:
print('Building pipeline...')
constructor(p)
print('Running pipeline...')
with _fix_xlang_instant_coding():
with beam.Pipeline(options=options, display_data=display_data) as p:
print('Building pipeline...')
constructor(p)
print('Running pipeline...')


def run_tests(argv=None, exit=True):
Expand Down Expand Up @@ -169,13 +185,14 @@ def run_tests(argv=None, exit=True):
"If you haven't added a set of tests yet, you can get started by "
'running your pipeline with the --create_test flag enabled.')

tests = [
yaml_testing.YamlTestCase(
pipeline_spec, test_spec, options, known_args.fix_tests)
for test_spec in test_specs
]
suite = unittest.TestSuite(tests)
result = unittest.TextTestRunner().run(suite)
with _fix_xlang_instant_coding():
tests = [
yaml_testing.YamlTestCase(
pipeline_spec, test_spec, options, known_args.fix_tests)
for test_spec in test_specs
]
suite = unittest.TestSuite(tests)
result = unittest.TextTestRunner().run(suite)

if known_args.fix_tests or known_args.create_test:
update_tests(known_args, pipeline_yaml, pipeline_spec, options, tests)
Expand Down
Loading