fix(python_adapter): add test for ArroyoDelegate in rust#200
fix(python_adapter): add test for ArroyoDelegate in rust#200
Conversation
| def rust_to_arroyo_msg( | ||
| message: RustMessage, committable: Committable | ||
| ) -> ArroyoMessage[RustMessage]: | ||
| arroyo_committable = { | ||
| Partition(Topic(partition[0]), partition[1]): offset | ||
| for partition, offset in committable.items() | ||
| } | ||
| return ArroyoMessage( | ||
| Value( | ||
| message, | ||
| arroyo_committable, | ||
| datetime.fromtimestamp(message.timestamp) if message.timestamp else None, | ||
| ) | ||
| ) | ||
|
|
||
|
|
||
| def arroyo_msg_to_rust( | ||
| message: ArroyoMessage[Union[FilteredPayload, RustMessage]], | ||
| ) -> Tuple[RustMessage, Committable] | None: | ||
| if isinstance(message.payload, FilteredPayload): | ||
| return None | ||
| committable = { | ||
| (partition.topic.name, partition.index): offset | ||
| for partition, offset in message.committable.items() | ||
| } | ||
| return (message.payload, committable) |
There was a problem hiding this comment.
These seem unnecessary.
They seem 95% copies of these https://github.com/getsentry/streams/blob/main/sentry_streams/sentry_streams/adapters/arroyo/multi_process_delegate.py#L46-L91
The difference seems toi be that your RunTask op takes RustMessage and return RustMessage rather than taking and returning a PyAnyMessage.
If you make your function take and return a PyAnyMessage you should be able to avoid these methods entirely and actually test the real logic we run in production instead, which would be recommended.
There was a problem hiding this comment.
I tried originally doing what you're suggesting, but spent >30 min wrestling with mypy to make it happy regarding the TMapIn/TMapOut generics in the ArroyoStrategyDelegate.
I can spend more time on this to get the typing right, it was just a nightmare.
| assert!(res.is_ok()); | ||
|
|
||
| let commit_request = operator.poll(); | ||
| assert!(commit_request.is_ok()); |
There was a problem hiding this comment.
How is this returning OK now that your fix has not been merged yet ?
Followup to #202
This PR adds a test for using the ArroyoDelegate from the PythonAdapter