Skip to content

KAFKA-7273: extend Connect Converter to support headers#6362

Merged
rhauch merged 7 commits intoapache:trunkfrom
sap1ens:add-headers-to-converters
Sep 25, 2019
Merged

KAFKA-7273: extend Connect Converter to support headers#6362
rhauch merged 7 commits intoapache:trunkfrom
sap1ens:add-headers-to-converters

Conversation

@sap1ens
Copy link
Copy Markdown
Contributor

@sap1ens sap1ens commented Mar 4, 2019

Update: See KIP-440, approved.

Extending Converter interface to support headers in backwards-compatible way. Very similar to Serializer and Deserializer interfaces.

A few questions I have to the contributors:

  • Do I need to update internal components like KafkaStatusBackingStore that use Converters? It doesn't look like they use headers in the moment, so I don't think it's necessary.
  • How about other Converter implementations like JsonConverter? Should they all be switched to the new interface method even if they don't use the headers?
  • Is KIP required in this case? Converter is a public interface, but the change is trivial and backwards-compatible. Created

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

Copy link
Copy Markdown
Contributor

@ryannedolan ryannedolan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this change makes sense, and also duplicates an existing API. See HeaderConverter and KIP-145.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-145+-+Expose+Record+Headers+in+Kafka+Connect

Comment thread connect/api/src/main/java/org/apache/kafka/connect/storage/Converter.java Outdated
Comment thread connect/api/src/main/java/org/apache/kafka/connect/storage/Converter.java Outdated
@sap1ens
Copy link
Copy Markdown
Contributor Author

sap1ens commented Mar 4, 2019

I don't think this change makes sense, and also duplicates an existing API. See HeaderConverter and KIP-145.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-145+-+Expose+Record+Headers+in+Kafka+Connect

HeaderConverter handles use-case when you need to transform headers. I don't want to transform them, I simply want to get access to the headers in my converter, together with the payload.

@sap1ens
Copy link
Copy Markdown
Contributor Author

sap1ens commented Mar 5, 2019

Hey @ewencp, @hachikuji, any chance you can take a look? Thanks

Copy link
Copy Markdown
Contributor

@ewencp ewencp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left one (major) comment. Didn't review rest in detail, but most looks straightforward and reasonable -- with the caveat that this particular direction warrants much broader review.

Comment thread connect/api/src/main/java/org/apache/kafka/connect/storage/Converter.java Outdated
@ewencp
Copy link
Copy Markdown
Contributor

ewencp commented Mar 5, 2019

@sap1ens regarding KIPs:

  • I agree, I think your use case might be different from HeaderConverter and an interface to handle multiple components of the message together could have value. HeaderConverter was intended to extend existing "isolated" converters, but may not have accounted for interactions between headers and keys/values.
  • This does require a KIP. At the potential cost of formality and overhead, we prefer to be very careful about public interfaces. "simple and backwards compatible" isn't sufficient to not require review -- a backwards compatible addition is still a commitment of support moving forward and a limitation on evolvability. (<-- is just stating rules, not any sort of "no" or discouragement from making the change! the KIP process just makes us thoroughly evaluate and think through the implications of a change like this)

Many folks are happy to help guide through KIP process (which can be pretty easy in some cases, but tbh I suspect this case will require some significant discussion and thought!). Off the top of my head, @kkonstantine @rhauch @wicknicks @rayokota can probably review and help with guidance on KIP and design.

@ryannedolan
Copy link
Copy Markdown
Contributor

Suggest adding unit tests to ensure headers are preserved.

@sap1ens
Copy link
Copy Markdown
Contributor Author

sap1ens commented Mar 12, 2019

@sap1ens sap1ens changed the title KAFKA-7273: [WIP] extend Connect Converter to support headers KAFKA-7273: extend Connect Converter to support headers Mar 12, 2019
@sap1ens
Copy link
Copy Markdown
Contributor Author

sap1ens commented Mar 24, 2019

@sap1ens sap1ens force-pushed the add-headers-to-converters branch from a968e91 to 3d019fa Compare August 22, 2019 04:22
@sap1ens
Copy link
Copy Markdown
Contributor Author

sap1ens commented Aug 22, 2019

KIP has been accepted. I've pushed another commit with extra tests.

Copy link
Copy Markdown
Contributor

@ryannedolan ryannedolan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to see a unit tests that covers the scenario described as motivation in the KIP, where serde is influenced by some header field (rather than just passed through).

Copy link
Copy Markdown
Contributor

@rhauch rhauch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks pretty good, though I have a few comments / suggestions below.

Comment thread connect/api/src/main/java/org/apache/kafka/connect/storage/Converter.java Outdated
@sap1ens
Copy link
Copy Markdown
Contributor Author

sap1ens commented Aug 26, 2019

@ryannedolan @rhauch thanks for the feedback so far, I've addressed it with commit 009212c

@sap1ens
Copy link
Copy Markdown
Contributor Author

sap1ens commented Aug 26, 2019

I've also had to suppress ClassFanOutComplexity check for WorkerSinkTaskTest/WorkerSourceTaskTest files. They are pretty big and test a lot of different use-cases. Splitting into different test files can help, but it's a big and not very obvious change.

@sap1ens
Copy link
Copy Markdown
Contributor Author

sap1ens commented Sep 4, 2019

@ryannedolan @rhauch hi, just wanted to check if you have time to take a look at the latest changes, thanks!

Copy link
Copy Markdown
Contributor

@ryannedolan ryannedolan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the improved tests, lgtm

Copy link
Copy Markdown
Contributor

@rhauch rhauch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall this is really good. My only concern is the use of Java serialization, which would be good to avoid because of the security implications, even if it's only used in test code.

@rhauch
Copy link
Copy Markdown
Contributor

rhauch commented Sep 18, 2019

@sap1ens yes, this looks good. Please resolve conflicts and ping me again. Thanks!

# Conflicts:
#	connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@sap1ens
Copy link
Copy Markdown
Contributor Author

sap1ens commented Sep 20, 2019

@rhauch merge conflict has been resolved, this PR is ready.

@sap1ens
Copy link
Copy Markdown
Contributor Author

sap1ens commented Sep 21, 2019

Failed test seems to be completely unrelated (flaky integration test perhaps?):

kafka.api.AdminClientIntegrationTest.testElectUncleanLeadersAndNoop

@rhauch
Copy link
Copy Markdown
Contributor

rhauch commented Sep 25, 2019

This PR does not change all of the uses of the older Converter methods:

  • JsonConverter and its tests
  • StringConverter and its tests
  • ByteArrayConverter and its tests
  • NumberConverter and its tests
  • KafkaConfigBackingStore and its tests
  • KafkaStatusBackingStore and its tests
  • OffsetStorageReaderImpl
  • OffsetStorageWriter and its tests

However, I think this is perfectly acceptable for a couple of reasons: changing those would increase the size of this PR, while leaving them as-is helps verify backward compatibility of converters that only implement the older methods.

The PR does change the important call sites in WorkerSinkTask and WorkerSourceTask, which are the primary places where converters are used (except for the internal topic records in the storage-related classes that should always use the JsonConverter that doesn't need the new methods).

Copy link
Copy Markdown
Contributor

@rhauch rhauch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks, @sap1ens!

@rhauch rhauch merged commit 70d1bb4 into apache:trunk Sep 25, 2019
ijuma added a commit to confluentinc/kafka that referenced this pull request Sep 29, 2019
Conflicts:
* .gitignore: addition of clients/src/generated-test was near
local additions for support-metrics.
* checkstyle/suppressions.xml: upstream refactoring of exclusions
for generator were near the local changes for support-metrics.
* gradle.properties: scala version bump caused a minor conflict
due to the kafka version change locally.
gradle/dependencies.gradle: bcpkix version bump was near avro
additions in the local version.

* apache-github/trunk: (49 commits)
  KAFKA-8471: Replace control requests/responses with automated protocol (apache#7353)
  MINOR: Don't generate unnecessary strings for debug logging in FetchSessionHandler (apache#7394)
  MINOR:fixed typo and removed outdated varilable name (apache#7402)
  KAFKA-8934: Create version file during build for Streams (apache#7397)
  KAFKA-8319: Make KafkaStreamsTest a non-integration test class (apache#7382)
  KAFKA-6883: Add toUpperCase support to sasl.kerberos.principal.to.local rule (KIP-309)
  KAFKA-8907; Return topic configs in CreateTopics response (KIP-525) (apache#7380)
  MINOR: Address review comments for KIP-504 authorizer changes (apache#7379)
  MINOR: add versioning to request and response headers (apache#7372)
  KAFKA-7273: Extend Connect Converter to support headers (apache#6362)
  MINOR: improve the Kafka RPC code generator (apache#7340)
  MINOR: Improve the org.apache.kafka.common.protocol code (apache#7344)
  KAFKA-8880: Docs on upgrade-guide (apache#7385)
  KAFKA-8179: do not suspend standby tasks during rebalance (apache#7321)
  KAFKA-8580: Compute RocksDB metrics (apache#7263)
  KAFKA-8880: Add overloaded function of Consumer.committed (apache#7304)
  HOTFIX: fix Kafka Streams upgrade note for broker backward compatibility (apache#7363)
  KAFKA-8848; Update system tests to use new AclAuthorizer (apache#7374)
  MINOR: remove unnecessary null check (apache#7299)
  KAFKA-6958: Overload methods for group and windowed stream to allow to name operation name using the new Named class (apache#6413)
  ...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants