Skip to content

KAFKA-9533: ValueTransform forwards null values#8108

Merged
bbejeck merged 5 commits intoapache:trunkfrom
mviamari:KAFKA-9533
Feb 19, 2020
Merged

KAFKA-9533: ValueTransform forwards null values#8108
bbejeck merged 5 commits intoapache:trunkfrom
mviamari:KAFKA-9533

Conversation

@mviamari
Copy link
Copy Markdown
Contributor

@mviamari mviamari commented Feb 13, 2020

Fixes a bug where KStream#transformValues would forward null values from the provided ValueTransform#transform operation.

A test was added for verification KStreamTransformValuesTest#shouldEmitNoRecordIfTransformReturnsNull. A parallel test for non-key ValueTransformer was not added, as they share the same code path.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@bbejeck
Copy link
Copy Markdown
Member

bbejeck commented Feb 13, 2020

Ok to test.

@mviamari
Copy link
Copy Markdown
Contributor Author

I'm not sure why these tests failed.

I think that one is being handled with #8109. The other might be related to #8094.

@mviamari
Copy link
Copy Markdown
Contributor Author

@bbejeck Is there anything I should do about these failures? I don't think they are related to my change, but that could be my unfamiliarity with the codebase.

Also, should I expect that fixes for these failed tests will need to be merged before this PR can be merged?

@bbejeck
Copy link
Copy Markdown
Member

bbejeck commented Feb 14, 2020

Java 11 failed with org.apache.kafka.streams.integration.RegexSourceIntegrationTest > testRegexMatchesTopicsAWhenDeleted test ticket exists.

Java 8 failed with kafka.admin.DescribeConsumerGroupTest.testDescribeGroupMembersWithShortInitializationTimeout

@mviamari, the test failures are unrelated, but we need a green build for merging. The RegexSourceIntegrationTest test exposes an issue, and once the fix is merged, we can retest this PR. In the meantime, I'll make time to review this PR soon.

@mviamari
Copy link
Copy Markdown
Contributor Author

Sounds good. Thanks for clarifying.

@bbejeck
Copy link
Copy Markdown
Member

bbejeck commented Feb 17, 2020

retest this please

Copy link
Copy Markdown
Member

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the patch @mviamari LGTM

@bbejeck
Copy link
Copy Markdown
Member

bbejeck commented Feb 17, 2020

ping @cadonna for another look before merging

Copy link
Copy Markdown
Member

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mviamari Thank you for the PR.

Here my comments.

Michael Viamari added 2 commits February 17, 2020 15:05
* Fix for KStreamTransformValues to filter `null` values returned from ValueTransformer
* Tests for `shouldNotForwardNullTransformValuesWithValueTransformerWithKey()`.
* Remove integration test KStreamTransformIntegrationTest#esWithValueTransformerWithKey
* Add unit test KStreamTransformValuesTest#shouldEmitNoRecordIfTransformReturnsNull
@mviamari
Copy link
Copy Markdown
Contributor Author

@cadonna Updated. I completely missed those unit tests the first time around, thanks for pointing that out.

@bbejeck
Copy link
Copy Markdown
Member

bbejeck commented Feb 18, 2020

Retest this please.

1 similar comment
@bbejeck
Copy link
Copy Markdown
Member

bbejeck commented Feb 18, 2020

Retest this please.

Copy link
Copy Markdown
Member

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mviamari Thank you for the update.

…sNull to use EasyMock instead of TestDriver.
Copy link
Copy Markdown
Member

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mviamari Thank you again for the update.

I have just minor comments of which most are due to me missing them in my previous review. Sorry!

After these changes, the PR is ready to merge.

@bbejeck
Copy link
Copy Markdown
Member

bbejeck commented Feb 18, 2020

previous test results gone.

retest this please.

@bbejeck
Copy link
Copy Markdown
Member

bbejeck commented Feb 18, 2020

Retest this please.

1 similar comment
@bbejeck
Copy link
Copy Markdown
Member

bbejeck commented Feb 18, 2020

Retest this please.

Copy link
Copy Markdown
Member

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mviamari LGTM, just a minor comment on the reset().

@bbejeck
Copy link
Copy Markdown
Member

bbejeck commented Feb 19, 2020

Java 8 failed with kafka.admin.DescribeConsumerGroupTest.testDescribeGroupWithShortInitializationTimeout

Java 11 passed.

@bbejeck
Copy link
Copy Markdown
Member

bbejeck commented Feb 19, 2020

Retest this please.

@bbejeck
Copy link
Copy Markdown
Member

bbejeck commented Feb 19, 2020

Ok to test

@bbejeck
Copy link
Copy Markdown
Member

bbejeck commented Feb 19, 2020

Retest this please.

1 similar comment
@bbejeck
Copy link
Copy Markdown
Member

bbejeck commented Feb 19, 2020

Retest this please.

@bbejeck bbejeck merged commit a41d3d8 into apache:trunk Feb 19, 2020
@bbejeck
Copy link
Copy Markdown
Member

bbejeck commented Feb 19, 2020

Merged #8108 into trunk

@cadonna
Copy link
Copy Markdown
Member

cadonna commented Feb 20, 2020

Thank you for the fix, @mviamari !

bbejeck pushed a commit that referenced this pull request Feb 20, 2020
Fixes a bug where KStream#transformValues would forward null values from the provided ValueTransform#transform operation.

A test was added for verification KStreamTransformValuesTest#shouldEmitNoRecordIfTransformReturnsNull. A parallel test for non-key ValueTransformer was not added, as they share the same code path.

Reviewers: Bruno Cadonna <bruno@confluent.io>, Bill Bejeck <bbejeck@gmail.com>
bbejeck pushed a commit that referenced this pull request Feb 20, 2020
Fixes a bug where KStream#transformValues would forward null values from the provided ValueTransform#transform operation.

A test was added for verification KStreamTransformValuesTest#shouldEmitNoRecordIfTransformReturnsNull. A parallel test for non-key ValueTransformer was not added, as they share the same code path.

Reviewers: Bruno Cadonna <bruno@confluent.io>, Bill Bejeck <bbejeck@gmail.com>
bbejeck pushed a commit that referenced this pull request Feb 20, 2020
Fixes a bug where KStream#transformValues would forward null values from the provided ValueTransform#transform operation.

A test was added for verification KStreamTransformValuesTest#shouldEmitNoRecordIfTransformReturnsNull. A parallel test for non-key ValueTransformer was not added, as they share the same code path.

Reviewers: Bruno Cadonna <bruno@confluent.io>, Bill Bejeck <bbejeck@gmail.com>
bbejeck pushed a commit that referenced this pull request Feb 20, 2020
Fixes a bug where KStream#transformValues would forward null values from the provided ValueTransform#transform operation.

A test was added for verification KStreamTransformValuesTest#shouldEmitNoRecordIfTransformReturnsNull. A parallel test for non-key ValueTransformer was not added, as they share the same code path.

Reviewers: Bruno Cadonna <bruno@confluent.io>, Bill Bejeck <bbejeck@gmail.com>
@bbejeck
Copy link
Copy Markdown
Member

bbejeck commented Feb 20, 2020

cherry-picked to branches 2.5, 2.4, 2.3 and 2.2

ijuma added a commit to confluentinc/kafka that referenced this pull request Feb 24, 2020
* apache-github/trunk: (23 commits)
  KAFKA-9530; Fix flaky test `testDescribeGroupWithShortInitializationTimeout` (apache#8154)
  HOTFIX: fix NPE in Kafka Streams IQ (apache#8158)
  MINOR: set scala version automatically based on gradle.properties
  KAFKA-9577; SaslClientAuthenticator incorrectly negotiates SASL_HANDSHAKE version (apache#8142)
  KAFKA-9441: Add internal TransactionManager (apache#8105)
  MINOR: Document endpoints for connector topic tracking (KIP-558)
  MINOR: Standby task commit needed when offsets updated (apache#8146)
  KAFKA-9206; Throw KafkaException on CORRUPT_MESSAGE error in Fetch response (apache#8111)
  MINOR: Remove unwanted regexReplace on tests/kafkatest/__init__.py
  KAFKA-9586: Fix errored json filename in ops documentation
  KAFKA-9575: Mention ZooKeeper 3.5.7 upgrade
  KAFKA-9481: Graceful handling TaskMigrated and TaskCorrupted (apache#8058)
  HOTFIX: don't try to remove uninitialized changelogs from assignment & don't prematurely mark task closed (apache#8140)
  MINOR: Fix javadoc at org.apache.kafka.clients.producer.KafkaProducer.InterceptorCallback#onCompletion (apache#7337)
  MINOR: Improve EOS example exception handling (apache#8052)
  MINOR: Fix a number of warnings in clients test (apache#8073)
  MINOR: Update shell scripts to support z/OS system (apache#7913)
  MINOR: Wording fix in Streams DSL docs (apache#5692)
  MINOR: Add missing @test annotation to MetadataTest#testMetadataMerge (apache#8141)
  KAFKA-9533: ValueTransform forwards `null` values (apache#8108)
  ...
@vvcephei
Copy link
Copy Markdown
Contributor

Hey @mviamari ,

It turns out that this fix causes more problems than it fixes :)

I've made a comment on the ticket explaining the situation. I'm copying it here for visibility:

Since merging this fix into trunk, we've discovered some use cases that actually depend on the prior behavior. In retrospect, I think that we should fix this issue by correcting the docs, not changing the behavior.

I bet this started with a simple copy/paste error from the transform API, in which returning null (as opposed to a KeyValue does mean to drop the record). But in that case, it makes sense, since we cannot process a record with no key. The choice is between throwing an NPE and dropping the record, so we drop the record. But this doesn't apply to transformValues, because the result would have the same key as the input.

Really, it's murky territory either way... Since we have transorm and flatTransform, as well as the xValues overloads, it implies that transform is one-in-to-one-out and flatTransform is one-in-to-zero-or-more-out. In that case, if you did want to drop an input record, you should use a flatTransform and return an empty collection. There should be an invariant for transform and transformValues that you get exactly the same number of output records as input records, whether they are null or otherwise.

Since it seems like the docs, not the behavior, are wrong, and since this is pre-existing behavior going back a long way in Streams (which breaks some users' code to change), we should go ahead and back out the PR. Sorry for the confusion.

As far as the actual fix goes, I'd be in favor of amending the docs to state that:

  1. When KStream#transformValues returns null, there will simply be an output record with a null value.
  2. When KStream#transform returns null, we consider that to be an invalid return, and we will log a warning while dropping the record (similar to other APIs in Streams)
  3. When KStream#transform returns a KeyValue(key, null), there will simply be an output record with a null value.
  4. When KStream#flatTransform and KStream#flatTransformValues return null, we consider that to be equivalent to returning an empty collection, in which case, we don't forward any results, and also do not log a warning. (I'd also be in favor of logging a warning here for consistency, but it seems like overkill).

Accordingly, I'm going to re-open this ticket, and Bill will revert the changes to fix the downstream builds. Sorry again for the trouble.
-John

bbejeck added a commit to bbejeck/kafka that referenced this pull request Feb 25, 2020
bbejeck added a commit that referenced this pull request Feb 25, 2020
bbejeck added a commit that referenced this pull request Feb 25, 2020
bbejeck added a commit that referenced this pull request Feb 25, 2020
bbejeck added a commit that referenced this pull request Feb 25, 2020
bbejeck added a commit that referenced this pull request Feb 25, 2020
…8167)

This reverts commit a41d3d8.

Reviewers: Matthias J. Sax <mjsax@apache.org>,  John Roesler <john@confluent.io>
@bbejeck
Copy link
Copy Markdown
Member

bbejeck commented Feb 25, 2020

reverted cherry-picks to 2.5, 2.4, 2.3, and 2.2

@mviamari mviamari deleted the KAFKA-9533 branch February 26, 2020 18:11
qq619618919 pushed a commit to qq619618919/kafka that referenced this pull request May 12, 2020
Fixes a bug where KStream#transformValues would forward null values from the provided ValueTransform#transform operation.

A test was added for verification KStreamTransformValuesTest#shouldEmitNoRecordIfTransformReturnsNull. A parallel test for non-key ValueTransformer was not added, as they share the same code path.

Reviewers: Bruno Cadonna <bruno@confluent.io>, Bill Bejeck <bbejeck@gmail.com>
qq619618919 pushed a commit to qq619618919/kafka that referenced this pull request May 12, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants