Skip to content

KAFKA-13431: Expose the original pre-transform topic partition and offset in sink records#14024

Merged
C0urante merged 6 commits intoapache:trunkfrom
yashmayya:KAFKA-13431-KIP-793
Jul 21, 2023
Merged

KAFKA-13431: Expose the original pre-transform topic partition and offset in sink records#14024
C0urante merged 6 commits intoapache:trunkfrom
yashmayya:KAFKA-13431-KIP-793

Conversation

@yashmayya
Copy link
Copy Markdown
Contributor

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@yashmayya yashmayya added connect kip Requires or implements a KIP labels Jul 17, 2023
Comment on lines 198 to 202
Copy link
Copy Markdown
Contributor Author

@yashmayya yashmayya Jul 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is binary compatible. Source / API compatibility is a bit trickier here however. The existing public constructors for SinkRecord have been modified to simply use topic, kafkaPartition, and kafkaOffset for originalTopic, originalKafkaPartition, and originalKafkaOffset respectively (so records created outside the Connect framework that were equal earlier will continue being equal and vice versa). The new constructor which includes the original topic, partition, and offset is now being used by the framework. This means that if there are 2 records with the same post-transform topic, partition, and offset but different pre-transform topic, partition or offset they will now be considered as unequal.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the detailed analysis!

Anticipating some possible use cases that may be impacted by this, I've come up with:

  1. Equality testing in unit tests
  2. Storing sink records in data structures (e.g., elements in a HashSet or keys in a HashMap) which contain elements that are unique according to their equals method

With 1, this change is unlikely to be controversial.

With 2, things are a little trickier, but ultimately I think it's best to proceed with this change. There's enough granularity in the existing equals method that I can't anticipate any realistic cases where that method would previously return true but would now return false. For example, if a connector is tracking records that were redelivered to it via Sink::put after throwing an exception in SinkTask::preCommit, this change won't affect that case since the re-delivered records would have the same original TPO.

Also, FWIW, there's precedent here with when we added headers to Connect in KIP-145; there was no discussion on the PR regarding the impact that changes to these methods may have on compatibility.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The connector can receive two SinkRecords which may pass equality if they have the same T/P/O, and the same contents, header, etc. Right now this can happen when an SMT causes a T/P/O collision, and if a record is re-delivered to the connector. After this change, only a re-delivery can cause this equality check to fire.

I don't think it would be reasonable for someone to rely on this to detect T/P/O collisions, because it is inherently unreliable due to checking equality of the contents of the record. I think the semantics for only being equal after a re-delivery is much better.

We don't seem to rely on this equals method in the runtime, so this should only affect connector implementations.

Comment on lines 210 to 217
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to the above change, this is binary compatible too. Although this one could potentially break some oddball downstream use cases like tests asserting exact hash code values for sink records. While this isn't strictly source / API compatible, it should be fairly unlikely that such use cases exist in the wild. If this is still deemed to be a concern (or more cases crop up), this change can be removed without affecting the rest of this PR / KIP.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM 👍

Copy link
Copy Markdown
Contributor

@gharris1727 gharris1727 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Yash!

I had some minor questions.

Comment on lines 198 to 202
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The connector can receive two SinkRecords which may pass equality if they have the same T/P/O, and the same contents, header, etc. Right now this can happen when an SMT causes a T/P/O collision, and if a record is re-delivered to the connector. After this change, only a re-delivery can cause this equality check to fire.

I don't think it would be reasonable for someone to rely on this to detect T/P/O collisions, because it is inherently unreliable due to checking equality of the contents of the record. I think the semantics for only being equal after a re-delivery is much better.

We don't seem to rely on this equals method in the runtime, so this should only affect connector implementations.

Comment thread connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java Outdated
Copy link
Copy Markdown
Contributor

@C0urante C0urante left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Yash! Lot of nits but overall this is looking great.

Comment thread connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java Outdated
Comment thread connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java Outdated
Comment thread connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java Outdated
Comment thread connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java Outdated
Comment thread connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java Outdated
Comment on lines 198 to 202
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the detailed analysis!

Anticipating some possible use cases that may be impacted by this, I've come up with:

  1. Equality testing in unit tests
  2. Storing sink records in data structures (e.g., elements in a HashSet or keys in a HashMap) which contain elements that are unique according to their equals method

With 1, this change is unlikely to be controversial.

With 2, things are a little trickier, but ultimately I think it's best to proceed with this change. There's enough granularity in the existing equals method that I can't anticipate any realistic cases where that method would previously return true but would now return false. For example, if a connector is tracking records that were redelivered to it via Sink::put after throwing an exception in SinkTask::preCommit, this change won't affect that case since the re-delivered records would have the same original TPO.

Also, FWIW, there's precedent here with when we added headers to Connect in KIP-145; there was no discussion on the PR regarding the impact that changes to these methods may have on compatibility.

Comment thread connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java Outdated
Comment on lines 210 to 217
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM 👍

Comment thread connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java Outdated
Comment thread connect/api/src/test/java/org/apache/kafka/connect/sink/SinkRecordTest.java Outdated
Copy link
Copy Markdown
Contributor Author

@yashmayya yashmayya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review folks!

Comment thread connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java Outdated
Comment thread connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java Outdated
Comment thread connect/api/src/test/java/org/apache/kafka/connect/sink/SinkRecordTest.java Outdated
Comment thread connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java Outdated
Copy link
Copy Markdown
Contributor

@C0urante C0urante left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Yash! Changes look good. I've kicked off another CI run since the last one failed for what appeared to be unrelated reasons.

I'd prefer to merge this after #14044 lands, but have no more review comments to give.

@C0urante
Copy link
Copy Markdown
Contributor

@gharris1727 did you want to give this another pass before we merge? No worries if you don't have the bandwidth.

Copy link
Copy Markdown
Contributor

@gharris1727 gharris1727 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Thanks @yashmayya for fixing this gap in the API!

@C0urante
Copy link
Copy Markdown
Contributor

Jenkins is failing because it's merging a commit on trunk that doesn't include #14037 before testing, which causes the rat task for Gradle to fail.

I've pushed an empty commit to retrigger CI; hopefully that'll do the trick and pick up the latest changes from trunk.

@C0urante
Copy link
Copy Markdown
Contributor

@yashmayya Mind patching the merge conflicts introduced by #14044? Thanks!

@yashmayya yashmayya force-pushed the KAFKA-13431-KIP-793 branch from 5d8026c to 4ef3a9e Compare July 20, 2023 14:47
@yashmayya
Copy link
Copy Markdown
Contributor Author

Thanks Chris, I've rebased this on the latest trunk.

@C0urante
Copy link
Copy Markdown
Contributor

Jenkins build is pretty unstable but this appears due to unrelated issues. We have a successful build (minus a few flaky tests) on two out of four nodes in the latest run. Merging...

@C0urante C0urante merged commit 4daeb27 into apache:trunk Jul 21, 2023
Cerchie pushed a commit to Cerchie/kafka that referenced this pull request Jul 25, 2023
…ion and offset in sink records (apache#14024)

Reviewers: Greg Harris <greg.harris@aiven.io>, Chris Egerton <chrise@aiven.io>
jeqo pushed a commit to aiven/kafka that referenced this pull request Aug 15, 2023
…ion and offset in sink records (apache#14024)

Reviewers: Greg Harris <greg.harris@aiven.io>, Chris Egerton <chrise@aiven.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

connect kip Requires or implements a KIP

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants