Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/trigger_files/beam_PostCommit_Python.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run.",
"modification": 12
"modification": 13
}

10 changes: 0 additions & 10 deletions sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
from apache_beam.typehints.schemas import LogicalType
from apache_beam.typehints.schemas import MillisInstant
from apache_beam.utils.timestamp import Timestamp

# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
Expand Down Expand Up @@ -255,10 +253,6 @@ def test_xlang_jdbc_write_read(self, database):
classpath=config['classpath'],
))

# Register MillisInstant logical type to override the mapping from Timestamp
# originally handled by MicrosInstant.
LogicalType.register_logical_type(MillisInstant)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to add this back?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, removing that line is the main purpose of this PR. Let's revert the PR for now and I will take a look at that failed test later.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created a PR that reverts this one and #35191, which also contributed to the failures of PostCommit Python XLang GCP Direct.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way I understand it is your fix avoids the need for using this to workaround to work withdate (beam:logical_type:javasdk_date:v1) and time (beam:logical_type:javasdk_time:v1) types.

For jdbc timestamp types, java encodes as beam:logical_type:millis_instant:v1, which python interprets as a language type Timestamp, but MicrosInstant is registered to handle timestamps?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the two logical types MillisInstant (

class MillisInstant(NoArgumentLogicalType[Timestamp, np.int64]):
) and MicrosInstant (
class MicrosInstant(NoArgumentLogicalType[Timestamp,
) both register Timestamp as their language type and MillisInstant goes first and then MicrosIntant goes second in the file.

There is a map internally to map a language type to its logical type and it is used when it tries to determine the coder. Without my fix and the workaround, the internal map is going to map Timestamp to MicroInstant.

Then the whole code path when determining the coder of MillisInstant will be MillisInstant -> TimeStamp -> Logical Type of TimeStamp -> MicroInstant and its representation type.

The workaround is to calling the register function again for MillisInstant, so that it overwrite the internal mapping to Timestamp -> Millistant.

My fix was trying to eliminate the need of the workaround.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we read from jdbc, we get a schema with a millis_instant urn e.g.

fields {
  name: "created_at"
  type {
    nullable: true
    logical_type {
      urn: "beam:logical_type:millis_instant:v1"
      representation {
        atomic_type: INT64
      }
    }
  }
  id: 2
  encoding_position: 2
}

Then in named_tuple_from_schema we try map the schema fields to python types

return LogicalType.from_runner_api(
.

[('created_at', typing.Optional[apache_beam.typehints.schemas.Timestamp])]

The coder for this tuple will use MicrosInstant because that was registered last for Timestamp.

I am testing a fix here to use a stub type so that
beam:logical_type:millis_instant:v1 -> Stub Type -> MillisInstant -> Timestamp
#35400


with TestPipeline() as p:
p.not_use_test_runner_api = True
result = (
Expand Down Expand Up @@ -355,10 +349,6 @@ def custom_row_equals(expected, actual):
classpath=config['classpath'],
))

# Register MillisInstant logical type to override the mapping from Timestamp
# originally handled by MicrosInstant.
LogicalType.register_logical_type(MillisInstant)

# Run read pipeline with custom schema
with TestPipeline() as p:
p.not_use_test_runner_api = True
Expand Down
6 changes: 2 additions & 4 deletions sdks/python/apache_beam/io/jdbc.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ def __init__(self, argument=""):

@classmethod
def representation_type(cls) -> type:
return Timestamp
return MillisInstant

@classmethod
def urn(cls):
Expand All @@ -417,7 +417,6 @@ def to_representation_type(self, value: datetime.date) -> Timestamp:
value, datetime.datetime.min.time(), tzinfo=datetime.timezone.utc))

def to_language_type(self, value: Timestamp) -> datetime.date:

return value.to_utc_datetime().date()

@classmethod
Expand Down Expand Up @@ -445,7 +444,7 @@ def __init__(self, argument=""):

@classmethod
def representation_type(cls) -> type:
return Timestamp
return MillisInstant

@classmethod
def urn(cls):
Expand All @@ -463,7 +462,6 @@ def to_representation_type(self, value: datetime.date) -> Timestamp:
tzinfo=datetime.timezone.utc))

def to_language_type(self, value: Timestamp) -> datetime.date:

return value.to_utc_datetime().time()

@classmethod
Expand Down
12 changes: 10 additions & 2 deletions sdks/python/apache_beam/typehints/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,10 @@ def typing_to_runner_api(self, type_: type) -> schema_pb2.FieldType:
array_type=schema_pb2.ArrayType(element_type=element_type))

try:
logical_type = LogicalType.from_typing(type_)
if LogicalType.is_known_logical_type(type_):
logical_type = type_
else:
logical_type = LogicalType.from_typing(type_)
except ValueError:
# Unknown type, just treat it like Any
return schema_pb2.FieldType(
Expand Down Expand Up @@ -669,7 +672,7 @@ def add(self, urn, logical_type):
def get_logical_type_by_urn(self, urn):
return self.by_urn.get(urn, None)

def get_urn_by_logial_type(self, logical_type):
def get_urn_by_logical_type(self, logical_type):
return self.by_logical_type.get(logical_type, None)

def get_logical_type_by_language_type(self, representation_type):
Expand Down Expand Up @@ -808,6 +811,11 @@ def from_runner_api(cls, logical_type_proto):
return logical_type()
return logical_type(argument)

@classmethod
def is_known_logical_type(cls, logical_type):
return cls._known_logical_types.get_urn_by_logical_type(
logical_type) is not None


class NoArgumentLogicalType(LogicalType[LanguageT, RepresentationT, None]):
@classmethod
Expand Down
39 changes: 11 additions & 28 deletions sdks/python/apache_beam/yaml/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
#

import argparse
import contextlib
import json
import os
import sys
Expand All @@ -27,8 +26,6 @@
import apache_beam as beam
from apache_beam.io.filesystems import FileSystems
from apache_beam.transforms import resources
from apache_beam.typehints.schemas import LogicalType
from apache_beam.typehints.schemas import MillisInstant
from apache_beam.yaml import yaml_testing
from apache_beam.yaml import yaml_transform
from apache_beam.yaml import yaml_utils
Expand Down Expand Up @@ -136,25 +133,12 @@ def _pipeline_spec_from_args(known_args):
return pipeline_yaml


@contextlib.contextmanager
def _fix_xlang_instant_coding():
# Scoped workaround for https://github.com/apache/beam/issues/28151.
old_registry = LogicalType._known_logical_types
LogicalType._known_logical_types = old_registry.copy()
try:
LogicalType.register_logical_type(MillisInstant)
yield
finally:
LogicalType._known_logical_types = old_registry


def run(argv=None):
options, constructor, display_data = build_pipeline_components_from_argv(argv)
with _fix_xlang_instant_coding():
with beam.Pipeline(options=options, display_data=display_data) as p:
print('Building pipeline...')
constructor(p)
print('Running pipeline...')
with beam.Pipeline(options=options, display_data=display_data) as p:
print('Building pipeline...')
constructor(p)
print('Running pipeline...')


def run_tests(argv=None, exit=True):
Expand Down Expand Up @@ -185,14 +169,13 @@ def run_tests(argv=None, exit=True):
"If you haven't added a set of tests yet, you can get started by "
'running your pipeline with the --create_test flag enabled.')

with _fix_xlang_instant_coding():
tests = [
yaml_testing.YamlTestCase(
pipeline_spec, test_spec, options, known_args.fix_tests)
for test_spec in test_specs
]
suite = unittest.TestSuite(tests)
result = unittest.TextTestRunner().run(suite)
tests = [
yaml_testing.YamlTestCase(
pipeline_spec, test_spec, options, known_args.fix_tests)
for test_spec in test_specs
]
suite = unittest.TestSuite(tests)
result = unittest.TextTestRunner().run(suite)

if known_args.fix_tests or known_args.create_test:
update_tests(known_args, pipeline_yaml, pipeline_spec, options, tests)
Expand Down
Loading