KIP-145: Add SMTs, HeaderFrom, DropHeaders and InsertHeader#9549
KIP-145: Add SMTs, HeaderFrom, DropHeaders and InsertHeader#9549tombentley merged 5 commits intoapache:trunkfrom
Conversation
|
@kkonstantine would you be able to review this? These SMTs were originally specified in KIP145 but never implemented. It seems they were forgotten about. @C0urante maybe you'd also like to review? |
|
Thanks for reaching out @tombentley! Happy to take a look. For reference, maybe you could add a link to https://cwiki.apache.org/confluence/display/KAFKA/KIP-145+-+Expose+Record+Headers+in+Kafka+Connect#KIP145ExposeRecordHeadersinKafkaConnect-Transformations to the description? Will begin reviewing shortly. |
C0urante
left a comment
There was a problem hiding this comment.
Looking pretty good! Will check out the tests after the core functional changes are settled.
There was a problem hiding this comment.
Might make sense to refer to the MOVE.name and COPY.name fields declared in the Operation enum below instead of using the literal "move" and "copy" strings in this section.
There was a problem hiding this comment.
MOVE.name is not a compile time constant (at least the compiler doesn't see it that way). I factored out constants to avoid the duplication of literals.
There was a problem hiding this comment.
Why duplicate headers here? According to the Header class's Javadocs, the collection should be mutable.
There was a problem hiding this comment.
I wasn't completely sure whether a transformation was allowed to mutate headers, since it has to create a new ConnectRecord. Take this code from WorkerSourceTask as an example:
final SourceRecord record = transformationChain.apply(preTransformRecord);
final ProducerRecord<byte[], byte[]> producerRecord = convertTransformedRecord(record);
if (producerRecord == null || retryWithToleranceOperator.failed()) {
counter.skipRecord();
commitTaskRecord(preTransformRecord, null);
continue;
}See how preTransformRecord can be used after the transformation chain has been applied? I think it would be incorrect for commitTaskRecord to commit the original record but with headers which had been mutated by transformations, right?
There was a problem hiding this comment.
Ah, that's fair. Does seem to be the pattern followed by the other out-of-the-box transformations as well; probably best to continue to follow that pattern.
I'm a little unnerved by this though, since as far as I can tell it's not publicly documented and so it's possible people writing their own transformations may be violating this implicit rule.
Out of scope, so I've filed KAFKA-10720 to track the need for possible documentation improvements.
There was a problem hiding this comment.
This looks fairly expensive to perform for every record. Do you think it might make sense to perform some caching, similarly to what's done in the ReplaceField transform?
There was a problem hiding this comment.
Same question here RE: duplication
There was a problem hiding this comment.
Same question here RE: duplication
|
@C0urante thanks for the review, some really helpful comments there. |
There was a problem hiding this comment.
Little silly to run this for every iteration of the parameterized test 😛
I'm guessing there isn't an easy way to run this only once?
There was a problem hiding this comment.
Yeah, parameterized tests do have their faults. AFAIK the only way to handle this using Junit 4 is to have two tests. You can arrange to have both tests in the same source file so both parameterised and non-parameterised tests are still easily discoverable (e.g. using Enclosed), but I've never seen that done in practice and it looks rather fiddly. Since these tests execute pretty quickly, I'm inclined to think we should just live with it. The real solution might just be to adopt Junit 5.
C0urante
left a comment
There was a problem hiding this comment.
LGTM, a few small comments but none of them are blockers. Thanks Tom!
|
@kkonstantine or perhaps @rhauch please could one of you take a look? |
|
@kkonstantine, @rhauch did you get a chance to look at this? |
e426dbf to
1a79ffd
Compare
mimaison
left a comment
There was a problem hiding this comment.
Thanks or the PR! Good to see these SMTs finally being added.
I've made a first quick pass (haven't looked at HeaderFrom yet) and left a few suggestions
There was a problem hiding this comment.
We don't have this transformation!
There was a problem hiding this comment.
Because Headers is a LinkedList, remove() has to iterate the whole list each time. I wonder if we could instead start from an empty headers list and add the headers not being removed?
There was a problem hiding this comment.
Should we enforce these fields are not null?
There was a problem hiding this comment.
I guess that makes sense @mimaison, but then for consistency shouldn't we make DropHeaders.headers, HeaderFrom.headers and HeaderFrom.fields reject empty lists?
There was a problem hiding this comment.
Should we reuse MOVE_OPERATION and COPY_OPERATION here?
1a79ffd to
a211f1e
Compare
|
@mimaison I went ahead and enforce non-nullness and non-emptiness, if you want to do another pass? |
There was a problem hiding this comment.
Even though I don't think it's reachable by users, should we have a message here?
There was a problem hiding this comment.
It is impossible due to the ConfigDef.ValidString.in(MOVE_OPERATION, COPY_OPERATION), so this is really an assertion failure. The line number in the stacktrace would be enough to track it down if it ever did happen due to a later refactoring, so imho an error message is of no value. But I'm happy to add one if you like.
There was a problem hiding this comment.
Ok, that's fair enough. Thanks
|
Not sure why the build had failed, I've rekicked it. |
…Header These SMTs were originally specified in KIP-145 but never implemented at the time. HeaderTo is not included since its original specification doesn't deal with the fact that there can be >1 header with the same name, but a field can only have a single value (which could be an array, but not if the headers for the given name had different schemas).
a211f1e to
c7ef2dc
Compare
|
Failing tests are those reported in KAFKA-12629, KAFKA-12284 and KAFKA-9295: All these pass using the relevant JDKs on my machine. Merging to trunk. |
These SMTs were originally specified in KIP-145 but never implemented at the time.
HeaderTois not included since its original specification doesn't deal with the fact that there can be >1 header with the same name, but a field can only have a single value (while you could use an array, that doesn't work if the headers for the given name had different schemas).