diff --git a/sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py b/sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py index 62620122e598..f65be8d67b9a 100644 --- a/sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py +++ b/sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py @@ -36,8 +36,8 @@ from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to +from apache_beam.typehints.logical_types import MillisInstant from apache_beam.typehints.schemas import LogicalType -from apache_beam.typehints.schemas import MillisInstant from apache_beam.utils.timestamp import Timestamp # pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports diff --git a/sdks/python/apache_beam/typehints/logical_types.py b/sdks/python/apache_beam/typehints/logical_types.py new file mode 100644 index 000000000000..15a6ddb53c04 --- /dev/null +++ b/sdks/python/apache_beam/typehints/logical_types.py @@ -0,0 +1,132 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Standard and common logical type implementations. + +This module is intended for internal use only. Nothing defined here provides +any backwards-compatibility guarantee. +""" + +from typing import NamedTuple + +import numpy as np + +from apache_beam.portability import common_urns +from apache_beam.typehints.schemas import LogicalType +from apache_beam.typehints.schemas import NoArgumentLogicalType +from apache_beam.utils.python_callable import PythonCallableWithSource +from apache_beam.utils.timestamp import Timestamp + +MicrosInstantRepresentation = NamedTuple( + 'MicrosInstantRepresentation', [('seconds', np.int64), + ('micros', np.int64)]) + + +@LogicalType.register_logical_type +class MillisInstant(NoArgumentLogicalType[Timestamp, np.int64]): + """Millisecond-precision instant logical type handles values consistent with + that encoded by ``InstantCoder`` in the Java SDK. + + This class handles :class:`apache_beam.utils.timestamp.Timestamp` language + type as :class:`MicrosInstant`, but it only provides millisecond precision, + because it is aimed to handle data encoded by Java sdk's InstantCoder which + has same precision level. + + Timestamp is handled by `MicrosInstant` by default. In some scenario, such as + read from cross-language transform with rows containing InstantCoder encoded + timestamps, one may need to override the mapping of Timetamp to MillisInstant. + To do this, re-register this class with + :func:`~apache_beam.typehints.schemas.LogicalType.register_logical_type`. + """ + @classmethod + def representation_type(cls): + # type: () -> type + return np.int64 + + @classmethod + def urn(cls): + return common_urns.millis_instant.urn + + @classmethod + def language_type(cls): + return Timestamp + + def to_language_type(self, value): + # type: (np.int64) -> Timestamp + + # value shifted as in apache_beams.coders.coder_impl.TimestampCoderImpl + if value < 0: + millis = int(value) + (1 << 63) + else: + millis = int(value) - (1 << 63) + + return Timestamp(micros=millis * 1000) + + +# Make sure MicrosInstant is registered after MillisInstant so that it +# overwrites the mapping of Timestamp language type representation choice and +# thus does not lose microsecond precision inside python sdk. +@LogicalType.register_logical_type +class MicrosInstant(NoArgumentLogicalType[Timestamp, + MicrosInstantRepresentation]): + """Microsecond-precision instant logical type that handles ``Timestamp``.""" + @classmethod + def urn(cls): + return common_urns.micros_instant.urn + + @classmethod + def representation_type(cls): + # type: () -> type + return MicrosInstantRepresentation + + @classmethod + def language_type(cls): + return Timestamp + + def to_representation_type(self, value): + # type: (Timestamp) -> MicrosInstantRepresentation + return MicrosInstantRepresentation( + value.micros // 1000000, value.micros % 1000000) + + def to_language_type(self, value): + # type: (MicrosInstantRepresentation) -> Timestamp + return Timestamp(seconds=int(value.seconds), micros=int(value.micros)) + + +@LogicalType.register_logical_type +class PythonCallable(NoArgumentLogicalType[PythonCallableWithSource, str]): + """A logical type for PythonCallableSource objects.""" + @classmethod + def urn(cls): + return common_urns.python_callable.urn + + @classmethod + def representation_type(cls): + # type: () -> type + return str + + @classmethod + def language_type(cls): + return PythonCallableWithSource + + def to_representation_type(self, value): + # type: (PythonCallableWithSource) -> str + return value.get_source() + + def to_language_type(self, value): + # type: (str) -> PythonCallableWithSource + return PythonCallableWithSource(value) diff --git a/sdks/python/apache_beam/typehints/schemas.py b/sdks/python/apache_beam/typehints/schemas.py index dc572b4d07dd..269b239761d4 100644 --- a/sdks/python/apache_beam/typehints/schemas.py +++ b/sdks/python/apache_beam/typehints/schemas.py @@ -71,7 +71,6 @@ import numpy as np from google.protobuf import text_format -from apache_beam.portability import common_urns from apache_beam.portability.api import schema_pb2 from apache_beam.typehints import row_type from apache_beam.typehints.native_type_compatibility import _get_args @@ -83,8 +82,6 @@ from apache_beam.typehints.schema_registry import SCHEMA_REGISTRY from apache_beam.typehints.schema_registry import SchemaTypeRegistry from apache_beam.utils import proto_utils -from apache_beam.utils.python_callable import PythonCallableWithSource -from apache_beam.utils.timestamp import Timestamp PYTHON_ANY_URN = "beam:logical:pythonsdk_any:v1" @@ -652,102 +649,7 @@ def _from_typing(cls, typ): return cls() -MicrosInstantRepresentation = NamedTuple( - 'MicrosInstantRepresentation', [('seconds', np.int64), - ('micros', np.int64)]) - - -@LogicalType.register_logical_type -class MillisInstant(NoArgumentLogicalType[Timestamp, np.int64]): - """Millisecond-precision instant logical type handles values consistent with - that encoded by ``InstantCoder`` in the Java SDK. - - This class handles :class:`apache_beam.utils.timestamp.Timestamp` language - type as :class:`MicrosInstant`, but it only provides millisecond precision, - because it is aimed to handle data encoded by Java sdk's InstantCoder which - has same precision level. - - Timestamp is handled by `MicrosInstant` by default. In some scenario, such as - read from cross-language transform with rows containing InstantCoder encoded - timestamps, one may need to override the mapping of Timetamp to MillisInstant. - To do this, re-register this class with - :func:`~LogicalType.register_logical_type`. - """ - @classmethod - def representation_type(cls): - # type: () -> type - return np.int64 - - @classmethod - def urn(cls): - return common_urns.millis_instant.urn - - @classmethod - def language_type(cls): - return Timestamp - - def to_language_type(self, value): - # type: (np.int64) -> Timestamp - - # value shifted as in apache_beams.coders.coder_impl.TimestampCoderImpl - if value < 0: - millis = int(value) + (1 << 63) - else: - millis = int(value) - (1 << 63) - - return Timestamp(micros=millis * 1000) - - -# Make sure MicrosInstant is registered after MillisInstant so that it -# overwrites the mapping of Timestamp language type representation choice and -# thus does not lose microsecond precision inside python sdk. -@LogicalType.register_logical_type -class MicrosInstant(NoArgumentLogicalType[Timestamp, - MicrosInstantRepresentation]): - """Microsecond-precision instant logical type that handles ``Timestamp``.""" - @classmethod - def urn(cls): - return common_urns.micros_instant.urn - - @classmethod - def representation_type(cls): - # type: () -> type - return MicrosInstantRepresentation - - @classmethod - def language_type(cls): - return Timestamp - - def to_representation_type(self, value): - # type: (Timestamp) -> MicrosInstantRepresentation - return MicrosInstantRepresentation( - value.micros // 1000000, value.micros % 1000000) - - def to_language_type(self, value): - # type: (MicrosInstantRepresentation) -> Timestamp - return Timestamp(seconds=int(value.seconds), micros=int(value.micros)) - - -@LogicalType.register_logical_type -class PythonCallable(NoArgumentLogicalType[PythonCallableWithSource, str]): - """A logical type for PythonCallableSource objects.""" - @classmethod - def urn(cls): - return common_urns.python_callable.urn - - @classmethod - def representation_type(cls): - # type: () -> type - return str - - @classmethod - def language_type(cls): - return PythonCallableWithSource - - def to_representation_type(self, value): - # type: (PythonCallableWithSource) -> str - return value.get_source() - - def to_language_type(self, value): - # type: (str) -> PythonCallableWithSource - return PythonCallableWithSource(value) +# pylint: disable=wrong-import-position, unused-import +# register standard and common logical types +import apache_beam.typehints.logical_types +# pylint: enable=wrong-import-position, unused-import