#12912 Fix KafkaEmitter not emitting queryType for a native query#12915
Conversation
|
I don't know if there's any better idea that callers should not care about handling the serialization in a special way. Another point is this test case |
|
@FrankChen021 Thank you for the comment! Yes, the serialization of the event is very simple, all implementations are Jackson aware, so serialization of Event to JSON is as simple as in this test or in the HttpPostEmitter: objectMapper.writeValueAsString(event)There are tests for checking the However, the problem is that the original KafkaEmitter requires to pass additional property to the output event, which is objectMapper.writeValueAsString(event)The original author created new map based on the The only other solution that comes to my mind is to do:
We won't use |
|
@BartMiki Thank you for the updated information.
Yes, this is the root cause. and seems that there's no way to deal with this problem without user-awareness. Since all changes in this PR are in Kafka extension, it makes senses to me. |
|
@FrankChen021 I will update the logic to use these steps (I think it is a bit more elegant):
|
|
@FrankChen021 I have new findings and I would like your comment on this. If we serialize the map as I've implemented it before, then the output is the same as the previous KafkaEmitter (of course with the If we serialize the whole event to JSON first then for the native query the If I use the second implementation then the output will be different from the original implementation. This may lead to errors in the consumers of the emitted stream. If consumer code distinguishes the SQL from native queries solely by the presence of Example consumer in Python: event = json.loads(rawEvent)
if "query" in event:
# do stuff with Native Query
elif "sql" in event:
# do stuff with SQL QueryIf I leave my implementation as is right now, then the above code would work correctly (backward compatible with the original KafkaEmitter logic). Should I leave the fixed implementation as is right now to maintain this compatibility or should I go with (the perhaps braking) approach of serializing the whole event at once? My take is to leave fix as is to have backward compatibility. What do you think @FrankChen021 ? |
|
I think you can keep the implementation as is right now, because in current case, the additional property map is always not empty, the 2nd implementation seems no gain. |
|
Failed CI task has been re-trigger. I believe it has nothing to do with changes in this PR |
|
Thank you! Ultimately I can rebase from master and push if the check keeps failing. I have also another question @FrankChen021 : Can I backport this fix to the stable branch? How can I do it? What is the best way to address backporting? |
|
Usually issues, such as severe security issues, will be backported. And as I know, currently there's no plan for version like 0.23.x |
868d968 to
56237ef
Compare
|
@BartMiki Thank you for the updating. The change generally LGTM with some minor comments. |
|
@rohangarg Would you like to review this change? |
|
@BartMiki The checkstyle task in CI fails to run. Could you fix it? |
@FrankChen021 : Yes, apologies for the delayed response - got caught up in something else meanwhile. I have posted a small review, should be good from my side once that is done. Thanks for the reminder! 👍 |
Sure, I'm on it |
|
Thank you @FrankChen021 and @rohangarg for your support and engaging discussion! It was a pleasure to work on this with you 👍 |
|
Thank you @BartMiki for your contribution. |
Fixes #12912.
Description
Fixes KafkaEmitter not emitting
queryTypefor a native query. The Event to JSON serialization was extracted to the external class:EventToJsonSerializer. This was done to simplify the testing logic for the serialization as well as extract the responsibility of serialization to the separate class.The logic builds
ObjectNodeincrementally based on the event.toMapmethod. Parsing each entry individually ensures that the Jackson polymorphic annotations are respected. Not respecting these annotation caused the missing of thequeryTypefrom output event.Key changed/added classes in this PR
KafkaEmitter- modifiedEventToJsonSerializer- addedThis PR has: