KAFKA-5142: Add Connect support for message headers (KIP-145)#4319
KAFKA-5142: Add Connect support for message headers (KIP-145)#4319rhauch wants to merge 15 commits intoapache:trunkfrom
Conversation
There was a problem hiding this comment.
Should check ! key.equals(this.key)
There was a problem hiding this comment.
Yeah, I think that's a valid optimization.
There was a problem hiding this comment.
this should be declared volatile (considering the double-checked locking below)
There was a problem hiding this comment.
should we call class this something else to avoid confusion with org.apache.kafka.common.header.Headers?
There was a problem hiding this comment.
No, I'd prefer to keep them the same name. There is no reason why a connector would ever use both Connect and Kafka header-related classes, and there are only a few places where Connect code uses both and it's easy enough to package-scope the types.
There was a problem hiding this comment.
I am a bit worried that this parser could change the underlying data in small ways, and cause issues which become hard to debug. For example, the string " 0.11" would drop the initial spaces, and numbers in scientific notation might be translated into decimal (for example, 1E-2 becomes 0.01). I think this class is very useful, but do you think we want to have org.apache.kafka.connect.converters.ByteArrayConverter as the default converter?
There was a problem hiding this comment.
Okay, what if we move all of this conversion logic into the Header.valueAsX methods? If we do that, I think we can use the StringConverter as the default and can avoid the SimpleHeaderConverter class altogether.
There was a problem hiding this comment.
Yes, that would make sense. The transformations would be explicit. Should we use ByteArrayConverter as the default?
There was a problem hiding this comment.
After thinking about this more, the existing StringConverter uses toString() to serialize all objects, which means for arrays and maps the string representation contains no quotes around strings and so would be relatively difficult to parse. So I'm not convinced we can do without SimpleHeaderConverter.
Also, I'm not sure why it'd be an issue if the default header converter parser the values into nominal values, even if that results in slight variations. First, existing connectors don't handle headers now. Second, any connector that did require the exact representation can specify a non-default header converter. Third, it makes more sense to me to make the headers meaningful and useful by default.
There was a problem hiding this comment.
I think there is a lot of value in providing meaning to the headers. If we could make the transition explicit, that would be the best. Since connect sits between many different systems with non compatible data models, a translation may cause unexpected issues in production. For example, a long in Java is up to 2^64-1, but javascript would use double precision to handle big integers.
Regarding the problem of serializing maps, I agree with you (StringConverter and ByteArrayConverter won't work).
Would it fine to think that if people are converting headers in their tasks or SMTs, then we can expect them to set SimpleHeaderConverter as their converter class? And if they never care about what is done to the headers, then it should be correct to have set ByteArrayConverter as the default value for header.converter property in WorkerConfig.java?
There was a problem hiding this comment.
Yes, it is certainly possible to use the ByteArrayConverter in either the worker config or the connector config, and when this is used the byte array in each header value will simply be passed directly into the Headers object for each record.
I'm still not convinced that this should be the default header converter, though.
|
Test results are unrelated: |
There was a problem hiding this comment.
nit : we probably would get the same result if we just had the equals check that does key schema & value . Instead of 2 if conditions.
There was a problem hiding this comment.
I don't see these literals used anywhere
There was a problem hiding this comment.
Nope, they're not. Removing them.
There was a problem hiding this comment.
may be use Objects.requireNonNull
There was a problem hiding this comment.
would SchemaAndValue be a better place to have this method?
There was a problem hiding this comment.
For some time it was not obvious to me that SchemaAndValue is not part of the public API, so my concern about that would be that it'd have to become a public method on SchemaAndValue, which is currently just a simple container. Here, we can keep it out of the API altogether.
There was a problem hiding this comment.
Got it. May be we could have something like SchemaAndValueUtils to include such utils for it. Just didn't feel great seeing this method in the ConnectHeaders class.
There was a problem hiding this comment.
SchemaAndValue is definitely public, it's used in some public APIs. Agree that there might be some handy utilities that could be colocated. I'd wait to put them there (and deal with any potential KIP fallout) until we know there are multiple use cases. atm, we don't have multiple use cases.
However, @rhauch, one question would be whether ConnectSchema.validateValue could work here instead of these methods?
mageshn
left a comment
There was a problem hiding this comment.
Looks Great. Thanks Randall.
There was a problem hiding this comment.
retainLatest() and this method have a lot in common. We could potentially refactor it, but not too concerned if its left as-is.
There was a problem hiding this comment.
The duplication is not ideal, but since the logic varies deep within the loop it's actually relatively complex to implement with a single method using Java 7, and we'd actually end up with something that's more complex. With Java 8 it'd be super easy, so I guess I'd prefer to leave it as is.
There was a problem hiding this comment.
i think leaving as is should be fine atm, and tbh at least they are both close enough together to be easily modified together. if we think this is useful enough, i'd file a jira dependent on the jdk8 update so we can follow up.
There was a problem hiding this comment.
similar comment as retainLatest
There was a problem hiding this comment.
same response as above. :-)
There was a problem hiding this comment.
probably not required to do a special logic for ConnectHeaders. The equals check using iterator below should probably be suffice.
There was a problem hiding this comment.
Maybe use Objects.requireNonNull
There was a problem hiding this comment.
Exception message doesn't look right (the word "list").
|
@mageshn, thanks for the review. I added a commit to fix several of the issues you identified. I left the |
kkonstantine
left a comment
There was a problem hiding this comment.
Nice milestone. I left a few comments. Thanks @rhauch !
There was a problem hiding this comment.
Sounds like a tautology to the name of the class. A bit more detail would be nice. Even Connect record values could be more specific.
There was a problem hiding this comment.
I'll add more detail.
There was a problem hiding this comment.
Why not use List (interface)?
There was a problem hiding this comment.
simply because we need to iterate backward and forward.
There was a problem hiding this comment.
I'm a little bit confused regarding what are the concurrency (thread-safety) assumptions that we have for this class.
- If it's meant to be used only by a single thread at a time, I'd expect the declaration of
headersas volatile to suffice for the lazy initialization, and the object would be safe for hand-off between threads (but not concurrent access). - If the pattern of double-checked locking (which I don't love even in its correct post-JDK5 form, but that's not the issue here) is necessary, because there might be concurrent access of a
ConnectHeadersobject, I'd assume this class is broken, because iterators are shared and I'd expectConcurrentModificationExceptions to occur (e.g. a header item is added by a thread while the headers list is traversed by another thread).
I'm pretty sure our case corresponds to 1). But I need to ask. If that's correct, I don't think we need the synchronized block around headers initialization and double-checked locking. By taking a look at the implementation of RecordHeaders which seems to share similarities with this class, there's no synchronized access to its volatile field (the list there is final but there's a boolean that is volatile).
I hope I'm not missing something.
There was a problem hiding this comment.
The whole point of this is to not actually create the list if there are no headers on a record. I'd expect that to be by far the prevalent case, so I thought it worthwhile to deal with that.
However, as you suggest, we should only have single threaded access, so the use of volatile and double-checked locking is, to say the least, ridiculous.
There was a problem hiding this comment.
I get the intention for lazy initialization. Asked generally since I wanted to be 100% sure about the expectations w.r.t. concurrency.
I think it's best to keep volatile since the field is not final and might be accessed in sequence by different threads, but synchronized can be skipped if there's no concurrent execution by different threads.
There was a problem hiding this comment.
I find a little bit confusing that in some methods this.headers is uses although there's no conflict with arguments (as it happens here), while in other methods headers is used without this. to refer to the member field (e.g. method lastWithName). Should we avoid this outside the constructor altogether?
There was a problem hiding this comment.
the norm (ideally) for AK is to only use this. as needed to handle masking. i'm sure there are exceptions, but almost universally the use case is for constructors to avoid awkward parameter naming in order to avoid conflicting with a convenient member variable name.
There was a problem hiding this comment.
Well, at least we should be consistent within a compilation unit, so I'll fix that.
There was a problem hiding this comment.
Feels a bit strange that javadoc is added for trivial private methods as this one, but not for protected methods elsewhere. Maybe we want to be more specific with javadoc requirements.
There was a problem hiding this comment.
I tend to put JavaDoc when the method throws an exception and it's not completely obvious when trying to use that method when that exception may occur. Hopefully that saves a step in the IDE when trying to use a method. And of course that cascades to needing full JavaDoc.
There was a problem hiding this comment.
let's keep it log everywhere.
There was a problem hiding this comment.
Let's keep it log everywhere. Since we have the checkstyle suppression anyways (still I'd prefer LOG :) )
There was a problem hiding this comment.
Do we need a counter?
There was a problem hiding this comment.
I was trying to not touch even more lines of code. But, yes, that's probably best.
There was a problem hiding this comment.
let's fix it here and on the Exception message above. "available converters are: "
There was a problem hiding this comment.
Should we use <code> vs double-quotes?
There was a problem hiding this comment.
why are we hitting these for cases like ConnectHeader? doesn't look that complex. this is a big patch, with a bunch of new code so I was expecting some of these, but it seems like we're adding quite a lot of new exceptions.
There was a problem hiding this comment.
ConnectHeader needed a suppression when it had all of the conversion methods, but since those are gone I can remove that suppression. ConnectHeaders and Values still are complex enough that they need a suppression.
There was a problem hiding this comment.
not really critical, but this seems like a departure since we don't validate, e.g., topic. Why would you even use the version with headers as an argument if you weren't going to pass it in? another option would just be adding the non-null validation.
There was a problem hiding this comment.
I guess I was just trying to be safe in case someone is passing in a reference they didn't know is null. Seemed easy enough to handle, especially since not handling such a case means the connector task dies. But I can clean it up a bit since the ConnectHeaders constructor can take a null value.
There was a problem hiding this comment.
I'm fine with this but some people prefer to do this via TimeUnits
There was a problem hiding this comment.
Time and Date in the same package do exactly this. Is it better to be consistent but use the constants, or to use TimeUnits here? I went with consistent but am happy to change it.
There was a problem hiding this comment.
either way is fine, i don't really java which is probably way Time and Date do this. was just noting since it is TimeUnits plenty of other places
There was a problem hiding this comment.
I think these are accidentally IS_ instead of ISO_.
There was a problem hiding this comment.
It might be worth documenting what happens recursively and if the schema is obeyed. e.g., if I pass in a schema for Array<Double>, is that what I get back? If I pass in null for the schema, what is the type of the elements?
There was a problem hiding this comment.
Updated the doc of this and other methods to hopefully be more clear. Basically, with convertToList, convertToMap, and convertToStruct, the schema is not used, but it is used in many of the numeric conversions when the supplied values are logical values. I kept it in the signature (a) to be consistent with the other forms, and (b) in case we need it for more complex conversions in the future.
There was a problem hiding this comment.
do we actually want to use these conversions that just wrap/truncate values or should we be throwing errors if they are out of range? the latter seems better to me, but probably complicates the code as we can't rely only on asLong and casting...
There was a problem hiding this comment.
I could go either way. Maybe we should do that as a followup PR?
There was a problem hiding this comment.
we can follow up, but it is a significant behavior so we should lock it down before releasing this. otherwise the behavior change is a kip
There was a problem hiding this comment.
value is already a String, why would we need value.toString() here?
There was a problem hiding this comment.
simply saves a cast.
There was a problem hiding this comment.
ack, honestly not sure if saving the cast or saving the vtable lookup is better
There was a problem hiding this comment.
same question about unnecessary toString here
There was a problem hiding this comment.
Is this right? I thought byte[] would be encoded as base64? this call is guaranteed to be successful, but it will just replace "malformed" data with the default replacement string, which almost certainly is not what a user would want.
There was a problem hiding this comment.
Yeah, that's what we discussed on the KIP discussion; I just neglected to make that change. Nice catch.
There was a problem hiding this comment.
This seems a bit repetitive, but more restrictive, as what convertTo is doing. Do we actually need both versions? Could we just unify the code with some flag for project vs promote?
There was a problem hiding this comment.
No, we don't need the two different forms. That was leftover from an earlier state. I've added another commit that cleans this up.
|
Added two commits that address comments from @ewencp and @kkonstantine, respectively. |
There was a problem hiding this comment.
what are EMPTY_HASH and EMPTY_ITERATOR trying to optimize? one case mentions immutability, but in general why would these be shared? i could see some concerns with a shared, standard set of headers being used across multiple records, then being passed to transforms, but wouldn't that always require copying the data, not just in the empty cases?
There was a problem hiding this comment.
I expect that most records will not have headers, and so ConnectHeaders currently allows the internal list to be null. Using EMPTY_HASH was a simple way to ensure that the hash code is the same for both situations.
And, if it is true that most records won't have headers, then why construct a new Iterator every time it's needed? All of those cases (and only those cases) can share a single immutable EMPTY_ITERATOR. Since an iterator allows removing items (which we don't have in this case) but not adding them, there's no reason to have a per-usage Iterator instance.
There was a problem hiding this comment.
might be true now, probably not true long term. also probably depends on where this is used - in a transformation for a source connector, it's likely for the foreseeable future that the headers are empty; for a sink connector, anywhere people have started using headers it is very unlikely they are empty.
the optimization is fine, i just watch for these things as they complicate the code and if they appear in the first version of code, usually aren't backed up by real data suggesting they are valuable.
There was a problem hiding this comment.
I've seen this a few places -- SchemaAndValue already has SchemaAndValue.NULL field which does the same thing -- no need to repeat a bunch of times in a bunch of classes.
There was a problem hiding this comment.
style nit: if the entire body is surrounded in a conditional, it's usually more readable to just check the negation and return, then reduce indentation with the rest of the body.
no real need to fix here, just a style thing to watch out for moving forward.
There was a problem hiding this comment.
Oh, I actually prefer returning immediately, but thought that style was less preferred. Happy to change it.
There was a problem hiding this comment.
the norm (ideally) for AK is to only use this. as needed to handle masking. i'm sure there are exceptions, but almost universally the use case is for constructors to avoid awkward parameter naming in order to avoid conflicting with a convenient member variable name.
There was a problem hiding this comment.
are we trying to optimize something by not having headers always be non-null? seems like we complicate (and increase riskiness) of the implementation here by not just always assuming headers is non-null. Is the issue the mutability of this class and the headers collection?
There was a problem hiding this comment.
super-nit: this should say Check that this is a Decimal, not date. repeated below for time and timestamp, but slightly less egregiously :)
There was a problem hiding this comment.
i think leaving as is should be fine atm, and tbh at least they are both close enough together to be easily modified together. if we think this is useful enough, i'd file a jira dependent on the jdk8 update so we can follow up.
There was a problem hiding this comment.
nit: should be ConnectHeaders. probably would be easy to figure out, but better to just get it right :)
There was a problem hiding this comment.
SchemaAndValue is definitely public, it's used in some public APIs. Agree that there might be some handy utilities that could be colocated. I'd wait to put them there (and deal with any potential KIP fallout) until we know there are multiple use cases. atm, we don't have multiple use cases.
However, @rhauch, one question would be whether ConnectSchema.validateValue could work here instead of these methods?
There was a problem hiding this comment.
Just a reminder, ByteBuffer again :)
There was a problem hiding this comment.
this seems a weird default given that headers are new? wouldn't the default be empty/nothing since currently converters don't assume default types?
There was a problem hiding this comment.
Ok, I'm fine with having null for the default. The reason I went this way was because of the non-standard Converter.configure(Map<String, ?> configs, boolean isKey) method that is currently used for all key and value converters, so right now the only thing that uses the Configurable.configure(Map<String, ?> configs) method is the header converter. However, that already sets the converter type, so I'm fine with not having a default.
There was a problem hiding this comment.
Actually, on second thought, there's no reason to have a default. As mentioned above, the current code always sets it for key, value, and header converters, so having a default is actually incorrect.
There was a problem hiding this comment.
are we continuing to add layers (and time) loading things? might be the right tradeoff now, but if we keep adding slowdown, we should consider spending time optimizing it as well.
There was a problem hiding this comment.
Yes, we are, but right now there's no other way to find plugins for HeaderConverter implementations (that don't also implement Converter). There's already KAFKA-6503 that I'd like to address in 1.1.
There was a problem hiding this comment.
sounds good. hadn't seen that bug, but is something i had also mentioned when we added the new plugin.path stuff. i would also very much like to see this.
There was a problem hiding this comment.
removing a bunch of boilerplate is great!
Changed the Connect API to add message headers as described in KIP-145. The new `Header` interface defines an immutable representation of a Kafka header (name-value pair) with support for the Connect value types and schemas. Kafka headers have a string name and a binary value, which doesn’t align well with Connect’s existing data and schema mechanisms. Thus, Connect’s `Header` interface provides methods for easily converting between many of the built-in primitive, structured, and logical data types. And, as discussed below, a new `HeaderConverter` interface is added to define how the Kafka header binary values are converted to Connect data objects. The new `Headers` interface defines an ordered collection of headers and is used to track all headers associated with a `ConnectRecord`. Like the Kafka headers API, the Connect `Headers` interface allows storing multiple headers with the same key in an ordered list. The Connect `Headers` interface is mutable and has a number of methods that make it easy for connectors and transformations to add, modify, and remove headers from the record, and the interface is designed to allow chaining multiple mutating methods. The existing constructors and methods in `ConnectRecord`, `SinkRecord`, and `SourceRecord` are unchanged to maintain backward compatibility, and in these situations the records will contain an empty `Headers` object that connectors and transforms can modify. There is also an additional constructor that allows an existing `Headers` to be passed in. A new overloaded form of `newRecord` method was created to allow connectors and transforms to create a new record with an entirely new `Headers` object. A new `HeaderConverter` interface is also defined to enable the Connect runtime framework to be able to serialize and deserialize headers between the in-memory representation and Kafka’s byte[] representation. Unit and integration tests are added for `ConnectHeader` and `ConnectHeaders`, the two implementation classes for headers. The `ConnectRecord` object is already used heavily, so only limited tests need to be added while quite a few of the existing tests already cover the changes. However, new unit tests were added for `SinkRecord` and `SourceRecord to verify the header behavior, including when the `newRecord` methods are called.
This is the second commit for the public Connect API changes for KIP-145, and deals primarily with `HeaderConverter` implementations. Connect has three `Converter` implementations, `StringConverter`, `JsonConverter` and `ByteArrayConverter`. These were modified to also implement `HeaderConverter`, without changing any of the existing functionality. Like many of our pluggable components in Connect, the `HeaderConverter` interface extends `Configurable` that allows implementations to expose a `ConfigDef` that describes the supported configuration properties, and a `config` method that can be used to initialize the component with provided configuration properties. The `StringConverter`, `JsonConverter` and `ByteArrayConverter` were changed to support these methods in a backward compatible manner. There are now `StringConverterConfig` and `JsonConverterConfig` classes that define the `ConfigDef` for the implementations; the `ByteArrayConverter` has no configuration properties and doesn't need a config class. Note that the existing `Converter` interface has a special `config` signature with a parameter that sas whether the converter is being used for keys or values. This is different than the `Configurable.config` signature, so this commit adds new `ConverterConfig` abstract class that defines a `converter.type` property that can be used to set whether the converter is being used for keys, values, or headers. The existing `Converter` methods internally set this property based upon the supplied boolean parameter, so the default for `converter.type` can be `header`.
This is the third commit for KIP-145 and changes the Connect runtime to support headers. Each Connect worker now configures a `HeaderConverter` for each connector task, in the same way it creates key and value `Converter` instances. This is entirely backward compatible, so that existing worker and connector configurations will work without changes. By default, the worker will use the `SimpleHeaderConverter` to serialize header values as strings and to deserialize them by inferring the schemas.
… than only Headers (KIP-145)
Changed the design of the public API to reflect the latest KIP-145 discussion. Also corrected several checkstyle and findbugs warnings.
Removed the `Header.valueAsX()` methods and replaced them with `Values.convertToX(Schema, Object)` methods that can be used outside of headers. These methods handle converting between all of the primitives and logical types, as well as between string and arrays/maps.
Removed unused constants and unnecessary block, and added optimization for `Header.rename(newKey)` when the new key is the same as the old key.
|
Rebased due to several conflicts with changes already on |
|
LGTM. One good run, one failing on flaky core integration tests, and one that seems to be linking to the wrong job... Merging to trunk for 1.1.0 |
| } | ||
|
|
||
| @Test | ||
| public void shouldCreateSinkRecordWithEmtpyHeaders() { |
There was a problem hiding this comment.
I think you meant shouldCreateSourceRecordWithEmtpyHeaders?
|
|
||
| @Test | ||
| public void stringHeaderToConnect() { | ||
| assertEquals(new SchemaAndValue(Schema.STRING_SCHEMA, "foo-bar-baz"), converter.toConnectHeader(TOPIC, "headerName", "{ \"schema\": { \"type\": \"string\" }, \"payload\": \"foo-bar-baz\" }".getBytes())); |
| " independent of connectors it allows any connector to work with any serialization format." + | ||
| " Examples of common formats include JSON and Avro. By default, the SimpleHeaderConverter is used to serialize" + | ||
| " header values to strings and deserialize them by inferring the schemas."; | ||
| public static final String HEADER_CONVERTER_CLASS_DEFAULT = SimpleHeaderConverter.class.getName(); |
There was a problem hiding this comment.
since you have multiple implementations of HeaderConverter, should we add a list validator here listing them?
| @@ -0,0 +1,1117 @@ | |||
| /* | |||
There was a problem hiding this comment.
this class has some fairly large methods. it might be worthwhile to break them into smaller methods for two reasons: (1) better readability; and (2) the JVM does throw in some better optimizations (inlining, for example) when there are small methods.
| } | ||
| // Missing either a comma or an end delimiter | ||
| if (COMMA_DELIMITER.equals(parser.previous())) { | ||
| throw new DataException("Malformed array: missing element after ','"); |
There was a problem hiding this comment.
Could you try running this test:
SchemaAndValue arr = Values.parseString("[1, 2, 3,,,]");
// expect an Exception.
I think it returns incorrect values. The trailing commas return the value of parser.original() from line 836. It should return something more local here.
|
Refer to this link for build results (access rights to CI server needed): |
|
Is there any plan to implement the SMT’s (as specified by KIP-145) as well ? |
KIP-145 has been accepted, and this PR implements KIP-145 except without the SMTs.
Changed the Connect API and runtime to support message headers as described in KIP-145.
The new
Headerinterface defines an immutable representation of a Kafka header (key-value pair) with support for the Connect value types and schemas. This interface provides methods for easily converting between many of the built-in primitive, structured, and logical data types.The new
Headersinterface defines an ordered collection of headers and is used to track all headers associated with aConnectRecord(and thusSourceRecordandSinkRecord). This does allow multiple headers with the same key. TheHeaderscontains methods for adding, removing, finding, and modifying headers. Convenience methods allow connectors and transforms to easily use and modify the headers for a record.A new
HeaderConverterinterface is also defined to enable the Connect runtime framework to be able to serialize and deserialize headers between the in-memory representation and Kafka’s byte[] representation. A newSimpleHeaderConverterimplementation has been added, and this serializes to strings and deserializes by inferring the schemas (Structheader values are serialized without the schemas, so they can only be deserialized asMapinstances without a schema.) TheStringConverter,JsonConverter, andByteArrayConverterhave all been extended to also beHeaderConverterimplementations. Each connector can be configured with a different header converter, although by default theSimpleHeaderConverteris used to serialize header values as strings without schemas.Unit and integration tests are added for
ConnectHeaderandConnectHeaders, the two implementation classes for headers. Additional test methods are added for the methods added to theConverterimplementations. Finally, theConnectRecordobject is already used heavily, so only limited tests need to be added while quite a few of the existing tests already cover the changes.Committer Checklist (excluded from commit message)