From 8bea4db1b21d9c6e95fae73e5e5eb35077775038 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 29 Aug 2025 12:54:21 -0400 Subject: [PATCH 1/5] Fix jdbc logical type not found when python sdk worker running in docker env. --- sdks/python/apache_beam/typehints/schemas.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/typehints/schemas.py b/sdks/python/apache_beam/typehints/schemas.py index 32dc2fd06ece..10b7087cf891 100644 --- a/sdks/python/apache_beam/typehints/schemas.py +++ b/sdks/python/apache_beam/typehints/schemas.py @@ -690,7 +690,13 @@ def add(self, urn, logical_type): self.by_language_type[logical_type.language_type()] = logical_type def get_logical_type_by_urn(self, urn): - return self.by_urn.get(urn, None) + logical_type = self.by_urn.get(urn, None) + if logical_type is None: + if urn in ["beam:logical_type:javasdk_date:v1", + "beam:logical_type:javasdk_time:v1"]: + import apache_beam.io.jdbc # pylint: disable=unused-import + logical_type = self.by_urn.get(urn, None) + return logical_type def get_urn_by_logical_type(self, logical_type): return self.by_logical_type.get(logical_type, None) From 917481b865a890656c83088ebcad2622292c4d78 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Wed, 3 Sep 2025 00:00:08 -0400 Subject: [PATCH 2/5] Add TODO. --- sdks/python/apache_beam/typehints/schemas.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sdks/python/apache_beam/typehints/schemas.py b/sdks/python/apache_beam/typehints/schemas.py index 10b7087cf891..97df2fa284f7 100644 --- a/sdks/python/apache_beam/typehints/schemas.py +++ b/sdks/python/apache_beam/typehints/schemas.py @@ -692,6 +692,9 @@ def add(self, urn, logical_type): def get_logical_type_by_urn(self, urn): logical_type = self.by_urn.get(urn, None) if logical_type is None: + # TODO: A temporary fix for missing jdbc logical types. + # See the discussion in https://github.com/apache/beam/issues/35738 for + # more detail. if urn in ["beam:logical_type:javasdk_date:v1", "beam:logical_type:javasdk_time:v1"]: import apache_beam.io.jdbc # pylint: disable=unused-import From 330a6cc6a3ee6b9e0885eddfc1fcfde320433110 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Wed, 3 Sep 2025 11:38:30 -0400 Subject: [PATCH 3/5] Move JdbcTimeType and JdbcDateType to schemas.py. --- sdks/python/apache_beam/io/jdbc.py | 92 +------------------ sdks/python/apache_beam/typehints/schemas.py | 96 +++++++++++++++++++- 2 files changed, 95 insertions(+), 93 deletions(-) diff --git a/sdks/python/apache_beam/io/jdbc.py b/sdks/python/apache_beam/io/jdbc.py index 79e6b3ce315e..df5d7f21a343 100644 --- a/sdks/python/apache_beam/io/jdbc.py +++ b/sdks/python/apache_beam/io/jdbc.py @@ -87,7 +87,6 @@ # pytype: skip-file import contextlib -import datetime import typing import numpy as np @@ -96,10 +95,11 @@ from apache_beam.transforms.external import BeamJarExpansionService from apache_beam.transforms.external import ExternalTransform from apache_beam.transforms.external import NamedTupleBasedPayloadBuilder +from apache_beam.typehints.schemas import JdbcDateType # pylint: disable=unused-import +from apache_beam.typehints.schemas import JdbcTimeType # pylint: disable=unused-import from apache_beam.typehints.schemas import LogicalType from apache_beam.typehints.schemas import MillisInstant from apache_beam.typehints.schemas import typing_to_runner_api -from apache_beam.utils.timestamp import Timestamp __all__ = [ 'WriteToJdbc', @@ -399,91 +399,3 @@ def __init__( ), expansion_service or default_io_expansion_service(classpath), ) - - -@LogicalType.register_logical_type -class JdbcDateType(LogicalType[datetime.date, MillisInstant, str]): - """ - For internal use only; no backwards-compatibility guarantees. - - Support of Legacy JdbcIO DATE logical type. Deemed to change when Java JDBCIO - has been migrated to Beam portable logical types. - """ - def __init__(self, argument=""): - pass - - @classmethod - def representation_type(cls) -> type: - return MillisInstant - - @classmethod - def urn(cls): - return "beam:logical_type:javasdk_date:v1" - - @classmethod - def language_type(cls): - return datetime.date - - def to_representation_type(self, value: datetime.date) -> Timestamp: - return Timestamp.from_utc_datetime( - datetime.datetime.combine( - value, datetime.datetime.min.time(), tzinfo=datetime.timezone.utc)) - - def to_language_type(self, value: Timestamp) -> datetime.date: - return value.to_utc_datetime().date() - - @classmethod - def argument_type(cls): - return str - - def argument(self): - return "" - - @classmethod - def _from_typing(cls, typ): - return cls() - - -@LogicalType.register_logical_type -class JdbcTimeType(LogicalType[datetime.time, MillisInstant, str]): - """ - For internal use only; no backwards-compatibility guarantees. - - Support of Legacy JdbcIO TIME logical type. . Deemed to change when Java - JDBCIO has been migrated to Beam portable logical types. - """ - def __init__(self, argument=""): - pass - - @classmethod - def representation_type(cls) -> type: - return MillisInstant - - @classmethod - def urn(cls): - return "beam:logical_type:javasdk_time:v1" - - @classmethod - def language_type(cls): - return datetime.time - - def to_representation_type(self, value: datetime.date) -> Timestamp: - return Timestamp.from_utc_datetime( - datetime.datetime.combine( - datetime.datetime.utcfromtimestamp(0), - value, - tzinfo=datetime.timezone.utc)) - - def to_language_type(self, value: Timestamp) -> datetime.date: - return value.to_utc_datetime().time() - - @classmethod - def argument_type(cls): - return str - - def argument(self): - return "" - - @classmethod - def _from_typing(cls, typ): - return cls() diff --git a/sdks/python/apache_beam/typehints/schemas.py b/sdks/python/apache_beam/typehints/schemas.py index 97df2fa284f7..69c7d0cc6349 100644 --- a/sdks/python/apache_beam/typehints/schemas.py +++ b/sdks/python/apache_beam/typehints/schemas.py @@ -66,6 +66,7 @@ # pytype: skip-file +import datetime import decimal import logging from typing import Any @@ -692,9 +693,7 @@ def add(self, urn, logical_type): def get_logical_type_by_urn(self, urn): logical_type = self.by_urn.get(urn, None) if logical_type is None: - # TODO: A temporary fix for missing jdbc logical types. - # See the discussion in https://github.com/apache/beam/issues/35738 for - # more detail. + if urn in ["beam:logical_type:javasdk_date:v1", "beam:logical_type:javasdk_time:v1"]: import apache_beam.io.jdbc # pylint: disable=unused-import @@ -1198,3 +1197,94 @@ def argument_type(cls): def argument(self): return self.max_length + + +# TODO: A temporary fix for missing jdbc logical types. +# See the discussion in https://github.com/apache/beam/issues/35738 for +# more detail. +@LogicalType.register_logical_type +class JdbcDateType(LogicalType[datetime.date, MillisInstant, str]): + """ + For internal use only; no backwards-compatibility guarantees. + + Support of Legacy JdbcIO DATE logical type. Deemed to change when Java JDBCIO + has been migrated to Beam portable logical types. + """ + def __init__(self, argument=""): + pass + + @classmethod + def representation_type(cls) -> type: + return MillisInstant + + @classmethod + def urn(cls): + return "beam:logical_type:javasdk_date:v1" + + @classmethod + def language_type(cls): + return datetime.date + + def to_representation_type(self, value: datetime.date) -> Timestamp: + return Timestamp.from_utc_datetime( + datetime.datetime.combine( + value, datetime.datetime.min.time(), tzinfo=datetime.timezone.utc)) + + def to_language_type(self, value: Timestamp) -> datetime.date: + return value.to_utc_datetime().date() + + @classmethod + def argument_type(cls): + return str + + def argument(self): + return "" + + @classmethod + def _from_typing(cls, typ): + return cls() + + +@LogicalType.register_logical_type +class JdbcTimeType(LogicalType[datetime.time, MillisInstant, str]): + """ + For internal use only; no backwards-compatibility guarantees. + + Support of Legacy JdbcIO TIME logical type. . Deemed to change when Java + JDBCIO has been migrated to Beam portable logical types. + """ + def __init__(self, argument=""): + pass + + @classmethod + def representation_type(cls) -> type: + return MillisInstant + + @classmethod + def urn(cls): + return "beam:logical_type:javasdk_time:v1" + + @classmethod + def language_type(cls): + return datetime.time + + def to_representation_type(self, value: datetime.time) -> Timestamp: + return Timestamp.from_utc_datetime( + datetime.datetime.combine( + datetime.datetime.utcfromtimestamp(0), + value, + tzinfo=datetime.timezone.utc)) + + def to_language_type(self, value: Timestamp) -> datetime.time: + return value.to_utc_datetime().time() + + @classmethod + def argument_type(cls): + return str + + def argument(self): + return "" + + @classmethod + def _from_typing(cls, typ): + return cls() \ No newline at end of file From 0284dbce1d03e001a5e5034e3009abe92315068c Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Wed, 3 Sep 2025 12:22:24 -0400 Subject: [PATCH 4/5] Fix lint --- sdks/python/apache_beam/typehints/schemas.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/typehints/schemas.py b/sdks/python/apache_beam/typehints/schemas.py index 69c7d0cc6349..c9690a968463 100644 --- a/sdks/python/apache_beam/typehints/schemas.py +++ b/sdks/python/apache_beam/typehints/schemas.py @@ -1287,4 +1287,4 @@ def argument(self): @classmethod def _from_typing(cls, typ): - return cls() \ No newline at end of file + return cls() From dc57470eedb8c921452f4aa6cfa3d643e245859c Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Wed, 3 Sep 2025 13:12:30 -0400 Subject: [PATCH 5/5] Revert the previous import hack. --- sdks/python/apache_beam/typehints/schemas.py | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/sdks/python/apache_beam/typehints/schemas.py b/sdks/python/apache_beam/typehints/schemas.py index c9690a968463..c21dde426fc7 100644 --- a/sdks/python/apache_beam/typehints/schemas.py +++ b/sdks/python/apache_beam/typehints/schemas.py @@ -691,14 +691,7 @@ def add(self, urn, logical_type): self.by_language_type[logical_type.language_type()] = logical_type def get_logical_type_by_urn(self, urn): - logical_type = self.by_urn.get(urn, None) - if logical_type is None: - - if urn in ["beam:logical_type:javasdk_date:v1", - "beam:logical_type:javasdk_time:v1"]: - import apache_beam.io.jdbc # pylint: disable=unused-import - logical_type = self.by_urn.get(urn, None) - return logical_type + return self.by_urn.get(urn, None) def get_urn_by_logical_type(self, logical_type): return self.by_logical_type.get(logical_type, None)