-
Notifications
You must be signed in to change notification settings - Fork 4.5k
Support Timestamp type in xlang JDBC Read and Write #22561
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
fa0a8d8
9830cfb
711909d
33595f0
e40ece1
b0484e7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -32,6 +32,7 @@ | |||||||||||||||||
| from apache_beam.coders.coders import NullableCoder | ||||||||||||||||||
| from apache_beam.coders.coders import SinglePrecisionFloatCoder | ||||||||||||||||||
| from apache_beam.coders.coders import StrUtf8Coder | ||||||||||||||||||
| from apache_beam.coders.coders import TimestampCoder | ||||||||||||||||||
| from apache_beam.coders.coders import VarIntCoder | ||||||||||||||||||
| from apache_beam.portability import common_urns | ||||||||||||||||||
| from apache_beam.portability.api import schema_pb2 | ||||||||||||||||||
|
|
@@ -168,10 +169,15 @@ def _nonnull_coder_from_type(field_type): | |||||||||||||||||
| _coder_from_type(field_type.map_type.key_type), | ||||||||||||||||||
| _coder_from_type(field_type.map_type.value_type)) | ||||||||||||||||||
| elif type_info == "logical_type": | ||||||||||||||||||
| # Special case for the Any logical type. Just use the default coder for an | ||||||||||||||||||
| # unknown Python object. | ||||||||||||||||||
| if field_type.logical_type.urn == PYTHON_ANY_URN: | ||||||||||||||||||
| # Special case for the Any logical type. Just use the default coder for an | ||||||||||||||||||
| # unknown Python object. | ||||||||||||||||||
| return typecoders.registry.get_coder(object) | ||||||||||||||||||
| elif field_type.logical_type.urn == common_urns.millis_instant.urn: | ||||||||||||||||||
| # Special case for millis instant logical type used to handle Java sdk's | ||||||||||||||||||
| # millis Instant. It explicitly uses TimestampCoder which deals with fix | ||||||||||||||||||
| # length 8-bytes big-endian-long instead of VarInt coder. | ||||||||||||||||||
| return TimestampCoder() | ||||||||||||||||||
|
||||||||||||||||||
| message LogicalTypes { |
beam/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml
Lines 449 to 454 in 4799828
| coder: | |
| urn: "beam:coder:row:v1" | |
| # f_timestamp: logical(micros_instant), f_string: string, f_int: int64 | |
| payload: "\n\x7f\n\x0bf_timestamp\x1ap:n\n#beam:logical_type:micros_instant:v1\x1aG2E\nC\n\r\n\x07seconds\x1a\x02\x10\x04\n\x0c\n\x06micros\x1a\x02\x10\x04\x12$4d3f6e8f-7412-4ad7-bfd9-b424a1664aef\n\x0e\n\x08f_string\x1a\x02\x10\x07\n\x0b\n\x05f_int\x1a\x02\x10\x04\x12$33dafd37-397c-4083-a84e-42177d122221" | |
| examples: | |
| "\x03\x00\x02\x00\xb6\x95\xd5\xf9\x05\xc0\xc4\x07\x1b2020-08-13T14:14:14.123456Z\xc0\xf7\x85\xda\xae\x98\xeb\x02": {f_timestamp: {seconds: 1597328054, micros: 123456}, f_string: "2020-08-13T14:14:14.123456Z", f_int: 1597328054123456} |
| var filteredCases = []struct{ filter, reason string }{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the instructions! Will complete the proto change. beam:logical_type:datetime:v1 is the millisecond precision timestamp backed by a fixed length INT64 that encoded with BigEndianLong. Will complete the proto change.
can we do the byte flipping in a to_language_type and to_representation_type implementation?
Considered this before. Under the current framework the value is already decoded with VarInt coder before sent to to_language_type or to_representation_type which is incorrect. All I need is a 8-byte fixed long integer primitive which does not exist in portable primitives, and there is not yet a consensus on reference coder (context). Therefore I ended up this solution to make things work while not introducing new primitives or other fundamental changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah... test case in standard_coders.yaml won't work either because Both micros_instant and millis_instant has Timestamp language type and the former will take over the priority of millis_instant (by design). When a timestamp is decoded to an integer, the test will call micros_instant's to_language_type.
i.e. MillisInstant->Timestamp conversion is one direction and Timestamp <-> MicrosInstant is bidirectional.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm I'd really like to have this tested at the standard_coders.yaml level as a unit test, it's not ideal if the only thing verifying compatibility is an integration test in Python.
If you have time tomorrow maybe we could do a quick video call to live debug what's going wrong here, and see if we can work around it. If it will be painful we can leave it as future work, but I'd like to understand the level of effort.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Managed to get a test case using the following code snippet
def generate_millis():
# Logical type that deals with millis_instant urn (MillisInstant)
MillisLogicalType = LogicalType._known_logical_types.get_logical_type_by_urn('beam:logical_type:millis_instant:v1')
# Original Logical type used to represent Timestamp (MicrosInstant)
TimestampLogicalType = LogicalType._known_logical_types.get_logical_type_by_language_type(Timestamp)
LogicalType._known_logical_types.by_language_type[Timestamp] = MillisLogicalType
schema = beam.typehints.schemas.named_tuple_to_schema(TestTuple)
coder = beam.coders.row_coder.RowCoder(schema)
print("payload = %s" % schema.SerializeToString())
examples = (TestTuple(
f_timestamp=Timestamp.from_rfc3339("2020-08-13T14:14:14.123Z"),
f_string="2020-08-13T14:14:14.123Z",
f_int=1597328054123),)
for example in examples:
print("example = %s" % coder.encode(example))
# recover original registration
LogicalType._known_logical_types.by_language_type[Timestamp] = TimestampLogicalTypeThe workaround is temporarily change the mapping of Timestamp -> MillisInstant logical type. Without it Timestamp always maps to MicrosInstant logical type in Python.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ahhh ok, so it looks like the issue was using Python to generate the test values. Another strategy might be to create the schema proto directly. But this way works too and is probably less code, thanks!
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -34,6 +34,7 @@ | |
| bytes <-----> BYTES | ||
| ByteString ------> BYTES | ||
| Timestamp <-----> LogicalType(urn="beam:logical_type:micros_instant:v1") | ||
| Timestamp <------ LogicalType(urn="beam:logical_type:millis_instant:v1") | ||
| Mapping <-----> MapType | ||
| Sequence <-----> ArrayType | ||
| NamedTuple <-----> RowType | ||
|
|
@@ -571,13 +572,13 @@ def argument(self): | |
| """Return the argument for this instance of the LogicalType.""" | ||
| raise NotImplementedError() | ||
|
|
||
| def to_representation_type(value): | ||
| def to_representation_type(self, value): | ||
| # type: (LanguageT) -> RepresentationT | ||
|
|
||
| """Convert an instance of LanguageT to RepresentationT.""" | ||
| raise NotImplementedError() | ||
|
|
||
| def to_language_type(value): | ||
| def to_language_type(self, value): | ||
| # type: (RepresentationT) -> LanguageT | ||
|
|
||
| """Convert an instance of RepresentationT to LanguageT.""" | ||
|
|
@@ -587,6 +588,7 @@ def to_language_type(value): | |
| def register_logical_type(cls, logical_type_cls): | ||
| """Register an implementation of LogicalType.""" | ||
| cls._known_logical_types.add(logical_type_cls.urn(), logical_type_cls) | ||
| return logical_type_cls | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These places were some minor bugs. If not return logical_type_cls, class decorated with
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thank you! |
||
|
|
||
| @classmethod | ||
| def from_typing(cls, typ): | ||
|
|
@@ -655,9 +657,54 @@ def _from_typing(cls, typ): | |
| ('micros', np.int64)]) | ||
|
|
||
|
|
||
| @LogicalType.register_logical_type | ||
| class MillisInstant(NoArgumentLogicalType[Timestamp, np.int64]): | ||
| """Millisecond-precision instant logical type handles values consistent with | ||
| that encoded by ``InstantCoder`` in the Java SDK. | ||
|
|
||
| This class handles :class:`apache_beam.utils.timestamp.Timestamp` language | ||
| type as :class:`MicrosInstant`, but it only provides millisecond precision, | ||
| because it is aimed to handle data encoded by Java sdk's InstantCoder which | ||
| has same precision level. | ||
|
|
||
| Timestamp is handled by `MicrosInstant` by default. In some scenario, such as | ||
| read from cross-language transform with rows containing InstantCoder encoded | ||
| timestamps, one may need to override the mapping of Timetamp to MillisInstant. | ||
| To do this, re-register this class with | ||
| :func:`~LogicalType.register_logical_type`. | ||
| """ | ||
| @classmethod | ||
| def representation_type(cls): | ||
| # type: () -> type | ||
| return np.int64 | ||
|
|
||
| @classmethod | ||
| def urn(cls): | ||
| return common_urns.millis_instant.urn | ||
|
|
||
| @classmethod | ||
| def language_type(cls): | ||
| return Timestamp | ||
|
|
||
| def to_language_type(self, value): | ||
| # type: (np.int64) -> Timestamp | ||
|
|
||
| # value shifted as in apache_beams.coders.coder_impl.TimestampCoderImpl | ||
| if value < 0: | ||
| millis = int(value) + (1 << 63) | ||
| else: | ||
| millis = int(value) - (1 << 63) | ||
|
|
||
| return Timestamp(micros=millis * 1000) | ||
|
|
||
|
|
||
| # Make sure MicrosInstant is registered after MillisInstant so that it | ||
| # overwrites the mapping of Timestamp language type representation choice and | ||
| # thus does not lose microsecond precision inside python sdk. | ||
| @LogicalType.register_logical_type | ||
| class MicrosInstant(NoArgumentLogicalType[Timestamp, | ||
| MicrosInstantRepresentation]): | ||
| """Microsecond-precision instant logical type that handles ``Timestamp``.""" | ||
| @classmethod | ||
| def urn(cls): | ||
| return common_urns.micros_instant.urn | ||
|
|
@@ -683,6 +730,7 @@ def to_language_type(self, value): | |
|
|
||
| @LogicalType.register_logical_type | ||
| class PythonCallable(NoArgumentLogicalType[PythonCallableWithSource, str]): | ||
| """A logical type for PythonCallableSource objects.""" | ||
| @classmethod | ||
| def urn(cls): | ||
| return common_urns.python_callable.urn | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wish we had a more general solution for mapping these to java time vs. joda time
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is a mapping from java.time.Instant to java.sql.Timestamp, joda.time is not involved. The comment was a clarification because I see joda.time.Instant is usually used in the codebase (that's why I did not declare import Instant but write down the whole class name here)
Do you think we need a general Row.getJavaInstant() as Row.getDateTime() ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah ok, my mistake. Regardless there's no action needed here, I was just lamenting that this date/time type situation is messy.