Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 2 additions & 90 deletions sdks/python/apache_beam/io/jdbc.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@
# pytype: skip-file

import contextlib
import datetime
import typing

import numpy as np
Expand All @@ -96,10 +95,11 @@
from apache_beam.transforms.external import BeamJarExpansionService
from apache_beam.transforms.external import ExternalTransform
from apache_beam.transforms.external import NamedTupleBasedPayloadBuilder
from apache_beam.typehints.schemas import JdbcDateType # pylint: disable=unused-import
from apache_beam.typehints.schemas import JdbcTimeType # pylint: disable=unused-import
from apache_beam.typehints.schemas import LogicalType
from apache_beam.typehints.schemas import MillisInstant
from apache_beam.typehints.schemas import typing_to_runner_api
from apache_beam.utils.timestamp import Timestamp

__all__ = [
'WriteToJdbc',
Expand Down Expand Up @@ -399,91 +399,3 @@ def __init__(
),
expansion_service or default_io_expansion_service(classpath),
)


@LogicalType.register_logical_type
class JdbcDateType(LogicalType[datetime.date, MillisInstant, str]):
"""
For internal use only; no backwards-compatibility guarantees.

Support of Legacy JdbcIO DATE logical type. Deemed to change when Java JDBCIO
has been migrated to Beam portable logical types.
"""
def __init__(self, argument=""):
pass

@classmethod
def representation_type(cls) -> type:
return MillisInstant

@classmethod
def urn(cls):
return "beam:logical_type:javasdk_date:v1"

@classmethod
def language_type(cls):
return datetime.date

def to_representation_type(self, value: datetime.date) -> Timestamp:
return Timestamp.from_utc_datetime(
datetime.datetime.combine(
value, datetime.datetime.min.time(), tzinfo=datetime.timezone.utc))

def to_language_type(self, value: Timestamp) -> datetime.date:
return value.to_utc_datetime().date()

@classmethod
def argument_type(cls):
return str

def argument(self):
return ""

@classmethod
def _from_typing(cls, typ):
return cls()


@LogicalType.register_logical_type
class JdbcTimeType(LogicalType[datetime.time, MillisInstant, str]):
"""
For internal use only; no backwards-compatibility guarantees.

Support of Legacy JdbcIO TIME logical type. . Deemed to change when Java
JDBCIO has been migrated to Beam portable logical types.
"""
def __init__(self, argument=""):
pass

@classmethod
def representation_type(cls) -> type:
return MillisInstant

@classmethod
def urn(cls):
return "beam:logical_type:javasdk_time:v1"

@classmethod
def language_type(cls):
return datetime.time

def to_representation_type(self, value: datetime.date) -> Timestamp:
return Timestamp.from_utc_datetime(
datetime.datetime.combine(
datetime.datetime.utcfromtimestamp(0),
value,
tzinfo=datetime.timezone.utc))

def to_language_type(self, value: Timestamp) -> datetime.date:
return value.to_utc_datetime().time()

@classmethod
def argument_type(cls):
return str

def argument(self):
return ""

@classmethod
def _from_typing(cls, typ):
return cls()
92 changes: 92 additions & 0 deletions sdks/python/apache_beam/typehints/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@

# pytype: skip-file

import datetime
import decimal
import logging
from typing import Any
Expand Down Expand Up @@ -1189,3 +1190,94 @@ def argument_type(cls):

def argument(self):
return self.max_length


# TODO: A temporary fix for missing jdbc logical types.
# See the discussion in https://github.com/apache/beam/issues/35738 for
# more detail.
@LogicalType.register_logical_type
class JdbcDateType(LogicalType[datetime.date, MillisInstant, str]):
"""
For internal use only; no backwards-compatibility guarantees.

Support of Legacy JdbcIO DATE logical type. Deemed to change when Java JDBCIO
has been migrated to Beam portable logical types.
"""
def __init__(self, argument=""):
pass

@classmethod
def representation_type(cls) -> type:
return MillisInstant

@classmethod
def urn(cls):
return "beam:logical_type:javasdk_date:v1"

@classmethod
def language_type(cls):
return datetime.date

def to_representation_type(self, value: datetime.date) -> Timestamp:
return Timestamp.from_utc_datetime(
datetime.datetime.combine(
value, datetime.datetime.min.time(), tzinfo=datetime.timezone.utc))

def to_language_type(self, value: Timestamp) -> datetime.date:
return value.to_utc_datetime().date()

@classmethod
def argument_type(cls):
return str

def argument(self):
return ""

@classmethod
def _from_typing(cls, typ):
return cls()


@LogicalType.register_logical_type
class JdbcTimeType(LogicalType[datetime.time, MillisInstant, str]):
"""
For internal use only; no backwards-compatibility guarantees.

Support of Legacy JdbcIO TIME logical type. . Deemed to change when Java
JDBCIO has been migrated to Beam portable logical types.
"""
def __init__(self, argument=""):
pass

@classmethod
def representation_type(cls) -> type:
return MillisInstant

@classmethod
def urn(cls):
return "beam:logical_type:javasdk_time:v1"

@classmethod
def language_type(cls):
return datetime.time

def to_representation_type(self, value: datetime.time) -> Timestamp:
return Timestamp.from_utc_datetime(
datetime.datetime.combine(
datetime.datetime.utcfromtimestamp(0),
value,
tzinfo=datetime.timezone.utc))

def to_language_type(self, value: Timestamp) -> datetime.time:
return value.to_utc_datetime().time()

@classmethod
def argument_type(cls):
return str

def argument(self):
return ""

@classmethod
def _from_typing(cls, typ):
return cls()
Loading