Skip to content

MINOR: Fixing null handilg in ValueAndTimestampSerializer#7679

Merged
guozhangwang merged 2 commits intoapache:trunkfrom
ncreep:null-handling-in-value-timestamp-serializer
Feb 5, 2020
Merged

MINOR: Fixing null handilg in ValueAndTimestampSerializer#7679
guozhangwang merged 2 commits intoapache:trunkfrom
ncreep:null-handling-in-value-timestamp-serializer

Conversation

@ncreep
Copy link
Copy Markdown
Contributor

@ncreep ncreep commented Nov 11, 2019

Since ValueAndTimestampSerializer wraps an unknown Serializer, the output of that Serializer can be null. In which case the line

.allocate(rawTimestamp.length + rawValue.length)

will throw a NullPointerException.

This pull request returns null instead.

Not sure where to place tests for this, any suggestions would be appreciated.

Thanks

Since `ValueAndTimestampSerializer` wraps an unknown `Serializer`, the output of that `Serializer` can be `null`. In which case the line
```java
.allocate(rawTimestamp.length + rawValue.length)
```
will throw a `NullPointerException`.

This pull request returns `null` instead.
@ncreep ncreep changed the title Fixing null handilg in ValueAndTimestampSerializer MINOR: Fixing null handilg in ValueAndTimestampSerializer Nov 12, 2019
@ncreep
Copy link
Copy Markdown
Contributor Author

ncreep commented Dec 1, 2019

I'm not quite sure what's the etiquette for drawing attention to a pull request here.
So, pinging the only contributor to the relevant file, @mjsax.

Thanks

@mjsax
Copy link
Copy Markdown
Member

mjsax commented Dec 2, 2019

Why would this be valid? Returning null for non-null data seems be an incorrect implementation of the wrapped serializer? And the null data case is handled? Can you elaborate on this change?

To be fair, Kafka "core" does not really specify how serialization has to work, however, because of compacted topics and tombstone handling, Kafka Streams is more opinionated and requires a null-to-null mapping -- otherwise, many assumptions in the Kafka Streams code base fall apart.

Seems we missed to add a unit test for this class -- can you create one?

@ncreep
Copy link
Copy Markdown
Contributor Author

ncreep commented Dec 2, 2019

I'm not sure whether what I'm doing is correct or some sort of abuse, but as you said, there's no explicit contract for serialization.

So I'm doing the following. I want to have something like Optional but for values that I pass through Kafka. The idea is to avoid actual null values flowing through my code, so my serde looks something like the following:

Byte[] serialize(String topic, StoredValue<T> data)  {
  if (data.isEmpty()) return null;
  else return ...; // serialize T
}

StoredValue<T> deserialize(String topic, Byte[] data) {
  if (data == null) StoreValue.emtpy();
  else return new Present(...); // deserialize T
}

The point being that null gets deserialized into a StoreValue.empty(), but serialized back into null when going into Kafka. So while null is flowing through Kafka (for all tombstone-like purposes), the code is safe from nulls.

Given the code above, the current behavior of ValueAndTimestampSerializer is to throw a NullPointerException (since the non-null StoreValue.empty() is converted into null).

Is this a legitimate use-case?

@ncreep
Copy link
Copy Markdown
Contributor Author

ncreep commented Dec 24, 2019

@mjsax, a reminder, in case this slipped through the cracks...

Thanks

@mjsax
Copy link
Copy Markdown
Member

mjsax commented Dec 27, 2019

Thanks for the reminder. I see what you are saying, but it won't fit into the KS "eco system". If you have a "tombstone" KS can only process the tombstone correctly, if the value is null. Ie, if you have an Optional type and you want to put it into a state store to delete something, the value must be null, ie, put(key, null). Hence we don't allow ValueAndTimestamp(null) as it would "break" proper deletes.

Not sure if there is a way to avoid that nulls flow through Kafka Streams, but atm, the whole code base uses and relies on nulls. For example, if we get a null key/value we often even skip calling the serde and just pass null along.

\cc @guozhangwang @bbejeck @vvcephei Do you have any good idea / advice how to handle this case?

@mjsax
Copy link
Copy Markdown
Member

mjsax commented Jan 9, 2020

Ping @guozhangwang @bbejeck @vvcephei

@guozhangwang
Copy link
Copy Markdown
Contributor

guozhangwang commented Jan 9, 2020

Since null values are treated special as a tombstone in Kafka, the Streams library currently follows this protocol:

  1. if a null object is passed in, we skip serialize and just return a null bytes, indicating tombstone.
  2. if a not-null object gets serialized into a null bytes, it is considered a tombstone as well.
  3. if a null bytes is read from the kafka (we should never read null bytes from store), we skip deserialize and just return a null object.
  4. for not-null bytes read from the store /kafka, we deserialize and it may still be null object.

@ncreep for your case, if the passed in object is null, the serde would be skipped and the null bytes would be returned, would that be sufficient?

But I think for ValueAndTimestamp Serde, maybe we have some gaps to respect the above rules and caused unexpected results. It worth double checking if some fixes are needed.

@ncreep
Copy link
Copy Markdown
Contributor Author

ncreep commented Jan 12, 2020

Thanks @mjsax and @guozhangwang for your responses.

In the logic described by @guozhangwang number 3. would break my use-case. Since in the client code, I want to convert the null values into a StoredValue.empty instance, so skipping the deserializer in this case will propagate null to the client code (I rely on the deserializer as a sort of "defense" against nulls). And indeed, in the meantime (between opening this issue and now) someone from my team stumbled upon this behavior and it triggered an unexpected NullPointerException.
Is this behavior something that can still be discussed or is it already set in stone?

The current issue is an instance of breaking 2.. So if it's agreed that the behavior for non-null objects is that they can still be serialized to null, then my patch aligns ValueAndTimestampSerializer to this behavior.

@guozhangwang
Copy link
Copy Markdown
Contributor

@ncreep Thanks for the detailed description of your use case.

As for 3) since it is a principle at the Kafka broker side that null bytes having a specific pre-defined semantics, Streams alone cannot change it; and if we want to change this has to be a big KIP discussion as it may have a large impact.

As for 2) yes I think we do need to fix the current behavior, and your PR looks good to me. cc @mjsax .

@ncreep
Copy link
Copy Markdown
Contributor Author

ncreep commented Jan 14, 2020

Okay, thanks.
I'll drop 3) for now and will add tests for my current pull request (in the coming days, hopefully).

@mjsax
Copy link
Copy Markdown
Member

mjsax commented Jan 14, 2020

I am still wondering (because of issue (3)) if we should allow it (ie, if we allow it, don't we mask an actual issue)? If @ncreep's user code uses StoredValue.empty to represent null, would it not "break" Kafka Streams (ie, change it's behavior as we might for example not write a proper tombstone and thus not delete something) and thus it should not be done at all?

In Kafka, nulls are a first class citizen and I would highly recommend to not use StoredValue.empty but write code that handles null properly. It's really just the concern, that this pattern is an anti-pattern to begin with and thus the current code, that exposes this anti-pattern should not be changed? Or do I overthink this case and there is no actual issue?

I am just not 100% sure, if this change is "safe" or not atm. I think for plain consumers/producers users can do whatever they want, but in KS, we should not allow to serialize non-null object to null (to be fair, atm, we don't have checks for it, but we basically assume that this contract is met, don't we?) \cc @guozhangwang

@guozhangwang
Copy link
Copy Markdown
Contributor

The current PR is only on serializers, that when the inner serializer returns null bytes (indicating a delete tombstone), we would skip wrapping the null bytes (which would throw NPE) with the timestamp raw bytes. I think this is the right fix to do.

Beyond that, I agree @ncreep use case serializing non-null objects to null bytes should be adjusted (as I recommended in previous comment).

@mjsax
Copy link
Copy Markdown
Member

mjsax commented Jan 22, 2020

The current PR is only on serializers, that when the inner serializer returns null bytes (indicating a delete tombstone), we would skip wrapping the null bytes (which would throw NPE) with the timestamp raw bytes. I think this is the right fix to do.

But this does (or should) never happen in Kafka Streams -- if we have a null value, we would never create a ValueAndTimestamp(null, <someTime>) object (in fact, that is not even possible: https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/ValueAndTimestamp.java#L34), but valueAndTimestamp variable would be null, too. (We use helpers make and getValueOrNull to ensure we always follow this pattern in our code: https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/ValueAndTimestamp.java#L48-L51 and https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/ValueAndTimestamp.java#L61-L63) Thus, when we actually need to serialize valueAndTimestamp we see that it's null and skip the serializer completely.

The issue only happens, because there is no null value, but null is replaced with StoreValue.empty and thus KS does not know it has a null at hand. Hence, we "incorrectly" create ValueAndTimestamp(StoreValue.empty, <someTime>) that should actually be just a null instead.

Beyond that, I agree @ncreep use case serializing non-null objects to null bytes should be adjusted (as I recommended in previous comment).

Hence, the violation of the null-to-null mapping pattern is (from my point of view) the root cause for the issue. My concern is, that we would mask the root cause what could lead to other problems at different parts in the code?

@ncreep
Copy link
Copy Markdown
Contributor Author

ncreep commented Jan 27, 2020

@mjsax, ignoring for a moment my opinion about the validity of my use-case. I think that as a client of the code being discussed, the error-reporting in this case makes it more difficult to debug the root cause. If you want to enforce the null contract you mentioned, then throwing a seemingly unintentional NullPointerException doesn't really help the user in debugging the cause. I think that a more intentional assertion would be appropriate here.

Back to me who wants my use-case to work. I would think that there should be a clear separation between the in-memory representation of a value and the serialized representation. It's a "coincidence" that null has the same representation for both cases, but this shouldn't dictate that the two representations should obey the same contract. And so I could imagine KS supporting the separation between "actually null", and "null-serializable".

I can also report, that from my (non-extensive) experience, the StoredValue serializer I quoted above seems to work fine everywhere except when dealing with storages, where I stumbled on the exception this PR tries to avoid.

@mjsax
Copy link
Copy Markdown
Member

mjsax commented Jan 30, 2020

I agree that we could throw a more informative exception.

I also agree, that it would be great to separate the representation of objects and their serialized format -- however, this is something that affects the whole system, not just Kafka Streams. Hence, if we want to change it, we need to start at the broker level: instead of representing tombstones as <key,null> messages, we would need to have a different way (for example some binary flag) to represent deletes. Only after the brokers supports this decoupling, we can allow it in upper layers (what would still be difficult because Kafka Streams needs to be backward compatible to older brokers) -- it would also affect Kafka Connect.

Hence, don't get my objection on this PR the wrong way. I totally agree with you, that the end-state you desire makes sense. I only disagree that we should merge this PR as it does not get us closer to this end state but might only introduce subtle bugs that are even harder to debug for end-users.

@guozhangwang
Copy link
Copy Markdown
Contributor

Like @mjsax mentioned if we want to not using null as a tombstone we should change the whole Kafka eco-system (there's actually a KIP proposed for this: https://cwiki.apache.org/confluence/display/KAFKA/KIP-87+-+Add+Compaction+Tombstone+Flag) i.e. to use a special value as "tombstone" rather than null value to indicate tombstone.

Back to this PR, I think it still makes sense as to align with 2) if a not-null object gets serialized into a null bytes, it is considered a tombstone as well. My rationale is that

serialize(final String topic,
                            final V data,
                            final long timestamp)

is a public interface so in the future we add other callers than its own overloaded serialize (but like I said, I agree it is less relevant to @ncreep 's case since he actually bumped into this via the other overloaded function) without knowing this "trap". So adding this extra check seems reasonable to me to make the code less future-vulnerable.

OR: if we think this function should not be public, we should just make it private and then we do not this check.

@LMnet
Copy link
Copy Markdown
Contributor

LMnet commented Feb 3, 2020

I'm trying to achieve the same goal as @ncreep and I faced the same problem. I'm using scala and I want to use Option to represent empty values in my streams. My Option serializer returns null for None values. Everything works as it should, excepting this case. And I see no reason why custom serializer can't return null for some values.

@mjsax
Copy link
Copy Markdown
Member

mjsax commented Feb 4, 2020

Personally, I am not convinced. But I won't block this PR any longer. To repeat my original review comment: Can we add a unit test for this case (please also add a comment why this want/need to support this).

@ncreep ncreep force-pushed the null-handling-in-value-timestamp-serializer branch from fd4750c to 4f248cb Compare February 4, 2020 12:03
@ncreep
Copy link
Copy Markdown
Contributor Author

ncreep commented Feb 4, 2020

Thanks @mjsax and @guozhangwang for taking the time to discuss this pull request.

I've added tests and comments.

@guozhangwang guozhangwang merged commit bdd0a92 into apache:trunk Feb 5, 2020
@mjsax
Copy link
Copy Markdown
Member

mjsax commented Feb 5, 2020

Thanks a lot @ncreep! Sorry for the long discussion and thanks a lot for contributing!

@guozhangwang Should we cherry-pick this to 2.5 branch?

ijuma added a commit to confluentinc/kafka that referenced this pull request Feb 7, 2020
Conflicts:
* build.gradle: moved avro plugin definition below
newly added test retry plugin.

* apache-github/trunk:
  MINOR: further InternalTopologyBuilder cleanup  (apache#8046)
  MINOR: Add timer for update limit offsets (apache#8047)
  HOTFIX: Fix spotsbug failure in Kafka examples (apache#8051)
  KAFKA-9447: Add new customized EOS model example (apache#8031)
  KAFKA-8164: Add support for retrying failed (apache#8019)
  HOTFIX: checkstyle for newly added unit test
  KAFKA-9261; Client should handle unavailable leader metadata (apache#7770)
  MINOR: Fix typos introduced in KIP-559 (apache#8042)
  MINOR: Fixing null handilg in ValueAndTimestampSerializer (apache#7679)
  KAFKA-9113: Clean up task management and state management (apache#7997)
  MINOR: fix checkstyle issue in ConsumerConfig.java (apache#8038)
  KAFKA-9491; Increment high watermark after full log truncation (apache#8037)
  KAFKA-9477 Document RoundRobinAssignor as an option for partition.assignment.strategy (apache#8007)
  KAFKA-9074: Correct Connect’s `Values.parseString` to properly parse a time and timestamp literal (apache#7568)
  KAFKA-9492; Ignore record errors in ProduceResponse for older versions (apache#8030)
ijuma added a commit to ijuma/kafka that referenced this pull request Apr 28, 2020
…t-for-generated-requests

* apache-github/trunk: (410 commits)
  KAFKA-8843: KIP-515: Zookeeper TLS support
  MINOR: Add missing quote for malformed line content (apache#8070)
  MINOR: Simplify KafkaProducerTest (apache#8044)
  KAFKA-9507; AdminClient should check for missing committed offsets (apache#8057)
  KAFKA-9519: Deprecate the --zookeeper flag in ConfigCommand (apache#8056)
  KAFKA-9509; Fixing flakiness of MirrorConnectorsIntegrationTest.testReplication (apache#8048)
  HOTFIX: Fix two test failures in JDK11 (apache#8063)
  DOCS - clarify transactionalID and idempotent behavior (apache#7821)
  MINOR: further InternalTopologyBuilder cleanup  (apache#8046)
  MINOR: Add timer for update limit offsets (apache#8047)
  HOTFIX: Fix spotsbug failure in Kafka examples (apache#8051)
  KAFKA-9447: Add new customized EOS model example (apache#8031)
  KAFKA-8164: Add support for retrying failed (apache#8019)
  HOTFIX: checkstyle for newly added unit test
  KAFKA-9261; Client should handle unavailable leader metadata (apache#7770)
  MINOR: Fix typos introduced in KIP-559 (apache#8042)
  MINOR: Fixing null handilg in ValueAndTimestampSerializer (apache#7679)
  KAFKA-9113: Clean up task management and state management (apache#7997)
  MINOR: fix checkstyle issue in ConsumerConfig.java (apache#8038)
  KAFKA-9491; Increment high watermark after full log truncation (apache#8037)
  ...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants