diff --git a/CHANGES.md b/CHANGES.md index 76f765104525..ee6a0238063f 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -77,6 +77,8 @@ ## Bugfixes * Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). +* Fixed Python cross-language JDBC IO Connector cannot read or write rows containing Timestamp type values [19817](https://github.com/apache/beam/issues/19817). + ## Known Issues * ([#X](https://github.com/apache/beam/issues/X)). diff --git a/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml b/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml index 38ce3355860a..a4482328e768 100644 --- a/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml +++ b/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml @@ -391,7 +391,7 @@ examples: # import typing # import apache_beam as beam # class Test(typing.NamedTuple): -# f_map: typing.Mapping[str,int] +# f_map: typing.Mapping[str,typing.Optional[int]] # schema = beam.typehints.schemas.named_tuple_to_schema(Test) # coder = beam.coders.row_coder.RowCoder(schema) # print("payload = %s" % schema.SerializeToString()) @@ -453,6 +453,13 @@ coder: examples: "\x03\x00\x02\x00\xb6\x95\xd5\xf9\x05\xc0\xc4\x07\x1b2020-08-13T14:14:14.123456Z\xc0\xf7\x85\xda\xae\x98\xeb\x02": {f_timestamp: {seconds: 1597328054, micros: 123456}, f_string: "2020-08-13T14:14:14.123456Z", f_int: 1597328054123456} +coder: + urn: "beam:coder:row:v1" + # f_timestamp: logical(millis_instant), f_string: string, f_int: int64 + payload: "\n:\n\x0bf_timestamp\x1a+:)\n#beam:logical_type:millis_instant:v1\x1a\x02\x10\x04\n\x0e\n\x08f_string\x1a\x02\x10\x07\n\x0b\n\x05f_int\x1a\x02\x10\x04\x12$80be749a-5700-4ede-89d8-dd9a4433a3f8" +examples: + "\x03\x00\x80\x00\x01s\xe8+\xd7k\x182020-08-13T14:14:14.123Z\xeb\xae\xaf\xc1\xbe.": {f_timestamp: -9223370439526721685, f_string: "2020-08-13T14:14:14.123Z", f_int: 1597328054123} + --- coder: diff --git a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/schema.proto b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/schema.proto index 3a6a79a6e2ea..20de76d53dac 100644 --- a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/schema.proto +++ b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/schema.proto @@ -130,6 +130,15 @@ message LogicalTypes { // amount of time since the epoch. MICROS_INSTANT = 1 [(org.apache.beam.model.pipeline.v1.beam_urn) = "beam:logical_type:micros_instant:v1"]; + + // A URN for MillisInstant type + // - Representation type: INT64 + // - A timestamp without a timezone represented by the number of + // milliseconds since the epoch. The INT64 value is encoded with + // big-endian shifted such that lexicographic ordering of the bytes + // corresponds to chronological order. + MILLIS_INSTANT = 2 [(org.apache.beam.model.pipeline.v1.beam_urn) = + "beam:logical_type:millis_instant:v1"]; } } diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/wire/CommonCoderTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/wire/CommonCoderTest.java index c3f173aabe19..46131826ea8c 100644 --- a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/wire/CommonCoderTest.java +++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/wire/CommonCoderTest.java @@ -422,6 +422,9 @@ private static Object parseField(Object value, Schema.FieldType fieldType) { return (String) value; case BOOLEAN: return (Boolean) value; + case DATETIME: + // convert shifted millis to epoch millis as in InstantCoder + return new Instant((Long) value + -9223372036854775808L); case BYTES: // extract String as byte[] return ((String) value).getBytes(StandardCharsets.ISO_8859_1); @@ -465,7 +468,7 @@ private static Object parseField(Object value, Schema.FieldType fieldType) { return fieldType .getLogicalType() .toInputType(parseField(value, fieldType.getLogicalType().getBaseType())); - default: // DECIMAL, DATETIME + default: // DECIMAL throw new IllegalArgumentException("Unsupported type name: " + fieldType.getTypeName()); } } diff --git a/sdks/go/test/regression/coders/fromyaml/fromyaml.go b/sdks/go/test/regression/coders/fromyaml/fromyaml.go index 4ea210c7ee83..61c4718bdcec 100644 --- a/sdks/go/test/regression/coders/fromyaml/fromyaml.go +++ b/sdks/go/test/regression/coders/fromyaml/fromyaml.go @@ -51,6 +51,7 @@ var unimplementedCoders = map[string]bool{ var filteredCases = []struct{ filter, reason string }{ {"logical", "BEAM-9615: Support logical types"}, {"30ea5a25-dcd8-4cdb-abeb-5332d15ab4b9", "https://github.com/apache/beam/issues/21206: Support encoding position."}, + {"80be749a-5700-4ede-89d8-dd9a4433a3f8", "https://github.com/apache/beam/issues/19817: Support millis_instant."}, } // Coder is a representation a serialized beam coder. diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java index 22ae0071a4a5..c1667cc9684c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java @@ -28,6 +28,7 @@ import java.util.Objects; import java.util.UUID; import java.util.stream.Collectors; +import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.model.pipeline.v1.SchemaApi; import org.apache.beam.model.pipeline.v1.SchemaApi.ArrayTypeValue; import org.apache.beam.model.pipeline.v1.SchemaApi.AtomicTypeValue; @@ -64,9 +65,13 @@ }) public class SchemaTranslation { - private static final String URN_BEAM_LOGICAL_DATETIME = "beam:logical_type:datetime:v1"; private static final String URN_BEAM_LOGICAL_DECIMAL = "beam:logical_type:decimal:v1"; private static final String URN_BEAM_LOGICAL_JAVASDK = "beam:logical_type:javasdk:v1"; + private static final String URN_BEAM_LOGICAL_MILLIS_INSTANT = + SchemaApi.LogicalTypes.Enum.MILLIS_INSTANT + .getValueDescriptor() + .getOptions() + .getExtension(RunnerApi.beamUrn); // TODO(https://github.com/apache/beam/issues/19715): Populate this with a LogicalTypeRegistrar, // which includes a way to construct @@ -198,7 +203,7 @@ private static SchemaApi.FieldType fieldTypeToProto( case DATETIME: builder.setLogicalType( SchemaApi.LogicalType.newBuilder() - .setUrn(URN_BEAM_LOGICAL_DATETIME) + .setUrn(URN_BEAM_LOGICAL_MILLIS_INSTANT) .setRepresentation(fieldTypeToProto(FieldType.INT64, serializeLogicalType)) .build()); break; @@ -358,7 +363,7 @@ private static FieldType fieldTypeFromProtoWithoutNullable(SchemaApi.FieldType p } // Special-case for DATETIME and DECIMAL which are logical types in portable representation, // but not yet in Java. (https://github.com/apache/beam/issues/19817) - if (urn.equals(URN_BEAM_LOGICAL_DATETIME)) { + if (urn.equals(URN_BEAM_LOGICAL_MILLIS_INSTANT)) { return FieldType.DATETIME; } else if (urn.equals(URN_BEAM_LOGICAL_DECIMAL)) { return FieldType.DECIMAL; diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java index f4a5169c77bc..644d3d801a5d 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java @@ -39,6 +39,7 @@ import org.apache.beam.sdk.io.jdbc.JdbcIO.ReadWithPartitions; import org.apache.beam.sdk.io.jdbc.JdbcIO.RowMapper; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.logicaltypes.MicrosInstant; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.Row; @@ -189,6 +190,18 @@ static JdbcIO.PreparedStatementSetCaller getPreparedStatementSetCaller( } String logicalTypeName = fieldType.getLogicalType().getIdentifier(); + + if (logicalTypeName.equals(MicrosInstant.IDENTIFIER)) { + // Process timestamp of MicrosInstant kind, which should only be passed from other type + // systems such as SQL and other Beam SDKs. + return (element, ps, i, fieldWithIndex) -> { + // MicrosInstant uses native java.time.Instant instead of joda.Instant. + java.time.Instant value = + element.getLogicalTypeValue(fieldWithIndex.getIndex(), java.time.Instant.class); + ps.setTimestamp(i + 1, value == null ? null : new Timestamp(value.toEpochMilli())); + }; + } + JDBCType jdbcType = JDBCType.valueOf(logicalTypeName); switch (jdbcType) { case DATE: diff --git a/sdks/python/apache_beam/coders/row_coder.py b/sdks/python/apache_beam/coders/row_coder.py index 1b434d68b479..d8c07306b9b9 100644 --- a/sdks/python/apache_beam/coders/row_coder.py +++ b/sdks/python/apache_beam/coders/row_coder.py @@ -32,6 +32,7 @@ from apache_beam.coders.coders import NullableCoder from apache_beam.coders.coders import SinglePrecisionFloatCoder from apache_beam.coders.coders import StrUtf8Coder +from apache_beam.coders.coders import TimestampCoder from apache_beam.coders.coders import VarIntCoder from apache_beam.portability import common_urns from apache_beam.portability.api import schema_pb2 @@ -168,10 +169,15 @@ def _nonnull_coder_from_type(field_type): _coder_from_type(field_type.map_type.key_type), _coder_from_type(field_type.map_type.value_type)) elif type_info == "logical_type": - # Special case for the Any logical type. Just use the default coder for an - # unknown Python object. if field_type.logical_type.urn == PYTHON_ANY_URN: + # Special case for the Any logical type. Just use the default coder for an + # unknown Python object. return typecoders.registry.get_coder(object) + elif field_type.logical_type.urn == common_urns.millis_instant.urn: + # Special case for millis instant logical type used to handle Java sdk's + # millis Instant. It explicitly uses TimestampCoder which deals with fix + # length 8-bytes big-endian-long instead of VarInt coder. + return TimestampCoder() logical_type = LogicalType.from_runner_api(field_type.logical_type) return LogicalTypeCoder( diff --git a/sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py b/sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py index c81adf8dd945..62620122e598 100644 --- a/sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py +++ b/sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py @@ -17,12 +17,15 @@ # pytype: skip-file +import datetime import logging +import time import typing import unittest from typing import Callable from typing import Union +import pytz from parameterized import parameterized import apache_beam as beam @@ -33,6 +36,9 @@ from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to +from apache_beam.typehints.schemas import LogicalType +from apache_beam.typehints.schemas import MillisInstant +from apache_beam.utils.timestamp import Timestamp # pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports try: @@ -53,19 +59,14 @@ JdbcReadTestRow = typing.NamedTuple( "JdbcReadTestRow", - [ - ("f_int", int), - ], + [("f_int", int), ("f_timestamp", Timestamp)], ) coders.registry.register_coder(JdbcReadTestRow, coders.RowCoder) JdbcWriteTestRow = typing.NamedTuple( "JdbcWriteTestRow", - [ - ("f_id", int), - ("f_real", float), - ("f_string", str), - ], + [("f_id", int), ("f_real", float), ("f_string", str), + ("f_timestamp", Timestamp)], ) coders.registry.register_coder(JdbcWriteTestRow, coders.RowCoder) @@ -127,10 +128,15 @@ def test_xlang_jdbc_write(self, database): self._setUpTestCase(container_init, db_string, driver) table_name = 'jdbc_external_test_write' self.engine.execute( - "CREATE TABLE {}(f_id INTEGER, f_real FLOAT, f_string VARCHAR(100))". - format(table_name)) + "CREATE TABLE {}(f_id INTEGER, f_real FLOAT, f_string VARCHAR(100), f_timestamp TIMESTAMP(3))" # pylint: disable=line-too-long + .format(table_name)) inserted_rows = [ - JdbcWriteTestRow(i, i + 0.1, 'Test{}'.format(i)) + JdbcWriteTestRow( + i, + i + 0.1, + 'Test{}'.format(i), + # In alignment with Java Instant which supports milli precision. + Timestamp.of(seconds=round(time.time(), 3))) for i in range(ROW_COUNT) ] @@ -152,7 +158,11 @@ def test_xlang_jdbc_write(self, database): fetched_data = self.engine.execute("SELECT * FROM {}".format(table_name)) fetched_rows = [ - JdbcWriteTestRow(int(row[0]), float(row[1]), str(row[2])) + JdbcWriteTestRow( + int(row[0]), + float(row[1]), + str(row[2]), + Timestamp.from_utc_datetime(row[3].replace(tzinfo=pytz.UTC))) for row in fetched_data ] @@ -168,10 +178,25 @@ def test_xlang_jdbc_read(self, database): CrossLanguageJdbcIOTest.DB_CONTAINER_CLASSPATH_STRING[database]) self._setUpTestCase(container_init, db_string, driver) table_name = 'jdbc_external_test_read' - self.engine.execute("CREATE TABLE {}(f_int INTEGER)".format(table_name)) + self.engine.execute( + "CREATE TABLE {}(f_int INTEGER, f_timestamp TIMESTAMP)".format( + table_name)) + all_timestamps = [] for i in range(ROW_COUNT): - self.engine.execute("INSERT INTO {} VALUES({})".format(table_name, i)) + # prepare timestamp + strtime = Timestamp.now().to_utc_datetime().strftime('%Y-%m-%dT%H:%M:%S') + dttime = datetime.datetime.strptime( + strtime, '%Y-%m-%dT%H:%M:%S').replace(tzinfo=pytz.UTC) + all_timestamps.append(Timestamp.from_utc_datetime(dttime)) + + # write records using sqlalchemy engine + self.engine.execute( + "INSERT INTO {} VALUES({},'{}')".format(table_name, i, strtime)) + + # Register MillisInstant logical type to override the mapping from Timestamp + # originally handled by MicrosInstant. + LogicalType.register_logical_type(MillisInstant) with TestPipeline() as p: p.not_use_test_runner_api = True @@ -188,7 +213,10 @@ def test_xlang_jdbc_read(self, database): classpath=classpath)) assert_that( - result, equal_to([JdbcReadTestRow(i) for i in range(ROW_COUNT)])) + result, + equal_to([ + JdbcReadTestRow(i, all_timestamps[i]) for i in range(ROW_COUNT) + ])) # Creating a container with testcontainers sometimes raises ReadTimeout # error. In java there are 2 retries set by default. diff --git a/sdks/python/apache_beam/portability/common_urns.py b/sdks/python/apache_beam/portability/common_urns.py index 5e8a3ce4cce1..e5788b1ad217 100644 --- a/sdks/python/apache_beam/portability/common_urns.py +++ b/sdks/python/apache_beam/portability/common_urns.py @@ -80,4 +80,5 @@ java_class_lookup = ExpansionMethods.Enum.JAVA_CLASS_LOOKUP micros_instant = LogicalTypes.Enum.MICROS_INSTANT +millis_instant = LogicalTypes.Enum.MILLIS_INSTANT python_callable = LogicalTypes.Enum.PYTHON_CALLABLE diff --git a/sdks/python/apache_beam/typehints/schemas.py b/sdks/python/apache_beam/typehints/schemas.py index 156242745d36..dc572b4d07dd 100644 --- a/sdks/python/apache_beam/typehints/schemas.py +++ b/sdks/python/apache_beam/typehints/schemas.py @@ -34,6 +34,7 @@ bytes <-----> BYTES ByteString ------> BYTES Timestamp <-----> LogicalType(urn="beam:logical_type:micros_instant:v1") + Timestamp <------ LogicalType(urn="beam:logical_type:millis_instant:v1") Mapping <-----> MapType Sequence <-----> ArrayType NamedTuple <-----> RowType @@ -571,13 +572,13 @@ def argument(self): """Return the argument for this instance of the LogicalType.""" raise NotImplementedError() - def to_representation_type(value): + def to_representation_type(self, value): # type: (LanguageT) -> RepresentationT """Convert an instance of LanguageT to RepresentationT.""" raise NotImplementedError() - def to_language_type(value): + def to_language_type(self, value): # type: (RepresentationT) -> LanguageT """Convert an instance of RepresentationT to LanguageT.""" @@ -587,6 +588,7 @@ def to_language_type(value): def register_logical_type(cls, logical_type_cls): """Register an implementation of LogicalType.""" cls._known_logical_types.add(logical_type_cls.urn(), logical_type_cls) + return logical_type_cls @classmethod def from_typing(cls, typ): @@ -655,9 +657,54 @@ def _from_typing(cls, typ): ('micros', np.int64)]) +@LogicalType.register_logical_type +class MillisInstant(NoArgumentLogicalType[Timestamp, np.int64]): + """Millisecond-precision instant logical type handles values consistent with + that encoded by ``InstantCoder`` in the Java SDK. + + This class handles :class:`apache_beam.utils.timestamp.Timestamp` language + type as :class:`MicrosInstant`, but it only provides millisecond precision, + because it is aimed to handle data encoded by Java sdk's InstantCoder which + has same precision level. + + Timestamp is handled by `MicrosInstant` by default. In some scenario, such as + read from cross-language transform with rows containing InstantCoder encoded + timestamps, one may need to override the mapping of Timetamp to MillisInstant. + To do this, re-register this class with + :func:`~LogicalType.register_logical_type`. + """ + @classmethod + def representation_type(cls): + # type: () -> type + return np.int64 + + @classmethod + def urn(cls): + return common_urns.millis_instant.urn + + @classmethod + def language_type(cls): + return Timestamp + + def to_language_type(self, value): + # type: (np.int64) -> Timestamp + + # value shifted as in apache_beams.coders.coder_impl.TimestampCoderImpl + if value < 0: + millis = int(value) + (1 << 63) + else: + millis = int(value) - (1 << 63) + + return Timestamp(micros=millis * 1000) + + +# Make sure MicrosInstant is registered after MillisInstant so that it +# overwrites the mapping of Timestamp language type representation choice and +# thus does not lose microsecond precision inside python sdk. @LogicalType.register_logical_type class MicrosInstant(NoArgumentLogicalType[Timestamp, MicrosInstantRepresentation]): + """Microsecond-precision instant logical type that handles ``Timestamp``.""" @classmethod def urn(cls): return common_urns.micros_instant.urn @@ -683,6 +730,7 @@ def to_language_type(self, value): @LogicalType.register_logical_type class PythonCallable(NoArgumentLogicalType[PythonCallableWithSource, str]): + """A logical type for PythonCallableSource objects.""" @classmethod def urn(cls): return common_urns.python_callable.urn