Skip to content

KAFKA-16505: Add source raw key and value#18739

Merged
cadonna merged 1 commit intoapache:trunkfrom
Dabz:KAFKA-16505-RawKeyValue-Store-Cache
Jun 5, 2025
Merged

KAFKA-16505: Add source raw key and value#18739
cadonna merged 1 commit intoapache:trunkfrom
Dabz:KAFKA-16505-RawKeyValue-Store-Cache

Conversation

@loicgreffier
Copy link
Copy Markdown
Contributor

@loicgreffier loicgreffier commented Jan 29, 2025

This PR is part of the
KIP-1034.

It brings the support for the source raw key and the source raw
value

in the ErrorHandlerContext. Required by the routing to DLQ implemented
by #17942.

Reviewers: Bruno Cadonna cadonna@apache.org

@github-actions github-actions bot added triage PRs from the community streams and removed triage PRs from the community labels Jan 29, 2025
Copy link
Copy Markdown
Member

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @loicgreffier for the PR!

You did not consider buffers in this PR as I described in my comment. Could you come up with a test that confirms that we also have the same issue with buffers and then provide a fix?


// As many records could be in-flight,
// freeing raw records in the context to reduce memory pressure
freeContext(context);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name of this method is a bit misleading. It basically frees the raw record within the context, not the whole context. What about calling it freeRawInputRecordFromContext()?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

Comment on lines +84 to +93

@Override
public boolean equals(final Object other) {
return super.equals(other);
}

@Override
public int hashCode() {
return super.hashCode();
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are those needed?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we added new rawKey and rawValue attributes, SpotBugs requires to define the equals function (https://spotbugs.readthedocs.io/en/stable/bugDescriptions.html#eq-class-doesn-t-override-equals-in-superclass-eq-doesnt-override-equals)

Comment on lines +53 to +55
this.sourceRawKey = null;
this.sourceRawValue = null;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You also need to add these info to the serialize() and deserialize() so that the buffer values also get the source record. Here it gets a bit tricky, because you need to consider the case where a serialized record context does not contain the source record because it was written by a version of Streams that has not yet had the source record in the context.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed having "optional" raw key and value make the deserialization tricky.

Let's say we serialize the ProcessorRecordContext in this order timestamp, offset, topic, partition, headers, rawKey, rawValue. After deserializing the headers, the next bytes can be rawKey and rawValue or can be something else (e.g., priorValue

final byte[] priorValue = getNullableSizePrefixedArray(buffer);
)

Right now I'm considering serializing the rawKey and rawValue at the very end of the ByteBuffer (i.e., right after here:

). Thus, after deserializing all the non-optional fields if there is some bytes remaining in the buffer, it should be the rawKey and rawValue.

@loicgreffier
Copy link
Copy Markdown
Contributor Author

@cadonna Changes about buffers will be added to this PR. However, despite my tests using suppress(), I did not manage to lose the rawKey and rawValue for now. I'm always receiving a value for these fields in the processingExceptionHandler.

@cadonna
Copy link
Copy Markdown
Member

cadonna commented Jan 31, 2025

@cadonna Changes about buffers will be added to this PR. However, despite my tests using suppress(), I did not manage to lose the rawKey and rawValue for now. I'm always receiving a value for these fields in the processingExceptionHandler.

I believe, your test needs to flush the buffer so that the records are written to the changelog topic and then restore the buffer from the changelog topic by stopping and re-starting the app.

Useful code:

@loicgreffier loicgreffier force-pushed the KAFKA-16505-RawKeyValue-Store-Cache branch from f14bd2e to 79f6395 Compare February 1, 2025 23:01
@loicgreffier loicgreffier changed the title KAFKA-16505: Fix source raw key and value in store caches KAFKA-16505: Fix lost source raw key and value in store caches and buffers Feb 1, 2025
@loicgreffier
Copy link
Copy Markdown
Contributor Author

@cadonna Thank you for the guidance. I could trigger the InMemoryTimeOrderedKeyValueChangeBuffer#flush and InMemoryTimeOrderedKeyValueChangeBuffer#restoreBatch and confirm that the rawKey and rawValue are lost.

Please ignore my previous comment #18739 (comment) that brings too many changes.

I've updated the serialization and deserialization. To take into consideration the optional rawKey and rawValue, I've added a char "marker" (i.e., k for rawKey, v for rawValue) to identify the presence of these values without mixing with any other possible bytes.

Let me know your thoughts about this approach

@loicgreffier loicgreffier force-pushed the KAFKA-16505-RawKeyValue-Store-Cache branch 2 times, most recently from 630b392 to 93cdf62 Compare February 2, 2025 15:02
@cadonna
Copy link
Copy Markdown
Member

cadonna commented Feb 5, 2025

@loicgreffier I discussed the need of writing raw key and raw value of the input record to the changelog topic for buffers with @mjsax and we had some concerns. Writing the input record to the changelog topic might significantly increase the storage requirements because we would need to write two records for each record in the buffer, the record itself and the corresponding input record.

@loicgreffier
Copy link
Copy Markdown
Contributor Author

loicgreffier commented Feb 5, 2025

@cadonna I understand the concern. Should we restart the thread and discuss the possible alternatives?

  • Make the sourceRawKey/sourceRawValue a conscious trade against memory with a new configuration?
  • Drop the sourceRawKey/sourceRawValue and use the current record instead?

@cadonna
Copy link
Copy Markdown
Member

cadonna commented Feb 24, 2025

@loicgreffier Sorry for the late reply!
Coincidentally, I talked to a user of Kafka Streams that implemented a dead letter queue. They have the requirement to write the raw input record to the dead letter queue. That means, we have two data point from users (yours and this user) that agree on the need of the input record in the dead letter queue.

The issue with the raw input record described above only applies to Streams applications that use suppress or stream-table joins with versioned tables and grace periods. IMO, we should not exclude the raw input record from the error handlers because of some applications. One option can be that writing the raw input record to the changelog topic is enabled by a configuration as you propose above. That means that error handlers need to gracefully handle the situation where the raw input record is not available.

WDYT?

@loicgreffier
Copy link
Copy Markdown
Contributor Author

@cadonna Agree to introduce a new parameter.

I'm wondering if the concern is the same for store caches. Should the parameter applies to store caches as well (CachingKeyValueStore, CachingSessionStore, CachingWindowStore) ? Or should we consider that there is no impact at all?

@cadonna
Copy link
Copy Markdown
Member

cadonna commented Mar 4, 2025

@loicgreffier

The first question we need to answer is the following:
Should the raw input record be (a) the record from which the defective record that triggered the handler was derived from or should the raw input record be (b) the record that triggered the defective record to be sent downstreams?

If we decide for (a), we need to define the input record for the DSL operation. For example, what is the input record that should be written to the dead letter queue in case an aggregation result triggers the error handler? Intuitively, I would say it is the last record that contributed to the aggregation. However, there are cases where this might not be true. For example, if the aggregate is a list of accumulated records and the last record is not responsible for the error downstreams. With (a), ideally, we need to carry the raw input record everywhere, into state stores, changelogs, caches, and buffers.
I checked the code. I believe, we have an issue also with caches since I realized that when an entry is added to the cache during a get operation, the record context is populated with sentinel values like -1 and null:

internalContext.cache().put(cacheName, key, new LRUCacheEntry(rawValue));
.

If we decide for (b), maintaining the raw input record might be a bit simpler but I am not sure how useful the raw input record is in this case. You cannot really understand why something went wrong and replay.

Let me know what you think.

@loicgreffier
Copy link
Copy Markdown
Contributor Author

@cadonna KIP-1034 suggests that the input record should be the one that triggers the sub-topology: https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams#KIP1034:DeadletterqueueinKafkaStreams-DefaultDeadletterqueuerecord

(a) the record from which the defective record that triggered the handler was derived from

(a) aligns with the definition in KIP-1034, doesn't it? It makes it easier to reprocess records.

(b) the record that triggered the defective record to be sent downstreams?

Does this mean that in the following example:

.stream()
.selectKey() 
.mapValues() <----------- Exception here
...

The record sent to the DLQ would be the input record of the mapValues processor? (i.e., the one passed to the processing error handler?


Intuitively, I would say it is the last record that contributed to the aggregation.

.stream(INPUT_TOPIC)
.selectKey()
.groupByKey()
.aggregate(
  initializer,
  aggregator, <----------- Exception here
  materialized
)
...

With the current implementation:

  • The source record is the last record that contributed to the aggregation.
  • The source topic is the repartition topic.
  • The default DLQ topic, specified by errors.dead.letter.queue.topic.name, may contain records from different sub-topologies (e.g., records from INPUT_TOPIC or the repartition topic).

I believe, we have an issue also with caches since I realized that when an entry is added to the cache during a get operation

I will check and reproduce this case

@sebastienviale Feel free to add anything if I missed a point

@sebastienviale
Copy link
Copy Markdown
Contributor

As @loicgreffier said, it is mentioned in the KIP that the raw record value: "If available, contains the value of the input message that triggered the sub-topology, null if triggered by punctuate"

Which seems to correspond to solution (a). It also seems logical to store in the raw record the last record before the exception occurs.

@loicgreffier loicgreffier changed the title KAFKA-16505: Fix lost source raw key and value in store caches and buffers KAFKA-16505: Add source raw key and value in contexts Apr 28, 2025
@loicgreffier loicgreffier force-pushed the KAFKA-16505-RawKeyValue-Store-Cache branch 2 times, most recently from 658e555 to 252df80 Compare April 28, 2025 18:33
@loicgreffier loicgreffier changed the title KAFKA-16505: Add source raw key and value in contexts KAFKA-16505: Add source raw key and value Apr 28, 2025
@loicgreffier
Copy link
Copy Markdown
Contributor Author

loicgreffier commented Apr 28, 2025

Some tests done:

✅ Stateless

streamsBuilder
  .stream()
  .peek()
  .selectKey()
  .mapValues() // <----- Exception 💥
  • Source raw key and source raw value from the input topic.

✅ Aggregate Caching Enabled (Exception in Aggregator)

streamsBuilder
  .stream()
  .peek()
  .groupBy()
  .aggregate(
    () -> new KafkaUserAggregate(new ArrayList<>()),
    new UserAggregator(), // <----- Exception 💥
    Materialized.<String, KafkaUserAggregate, KeyValueStore<Bytes, byte[]>>as(USER_AGGREGATE_STORE)
        .withKeySerde(Serdes.String())
        .withValueSerde(SerdesUtils.getValueSerdes())
        .withCachingEnabled());
  • Source raw key and source raw value from the repartition topic.
  • Source raw key and source raw value of the latest record that enters the aggregator.

✅ Aggregate Caching Enabled (Exception after aggregation)

streamsBuilder
  .stream()
  .peek()
  .groupByKey()
  .aggregate(
    () -> new KafkaUserAggregate(new ArrayList<>()),
    new UserAggregator(),
    Materialized.<String, KafkaUserAggregate, KeyValueStore<Bytes, byte[]>>as(USER_AGGREGATE_STORE)
        .withKeySerde(Serdes.String())
        .withValueSerde(SerdesUtils.getValueSerdes())
        .withCachingEnabled())
  .mapValues() // <----- Exception 💥
  • Source raw key and source raw value from the repartition topic.
  • Source raw key and source raw value of the latest record that enters the aggregator.
  • ⚠️ Needs the updates on CachingKeyValueStore.java, CachingSessionStore.java, CachingWindowStore.java. Otherwise,
    source raw key and source raw value are null.

✅ Table Caching Enabled

streamsBuilder
  .table(
    USER_TOPIC,
    Consumed.with(Serdes.String(), SerdesUtils.getValueSerdes()),
    Materialized.<String, KafkaUser, KeyValueStore<Bytes, byte[]>>as("USER_STORE")
        .withKeySerde(Serdes.String())
        .withValueSerde(SerdesUtils.getValueSerdes())
        .withCachingEnabled())
  .mapValues(); // <----- Exception 💥
  • Source raw key and source raw value from the input topic.
  • ⚠️ Needs the updates on CachingKeyValueStore.java, CachingSessionStore.java, CachingWindowStore.java. Otherwise,
    source raw key and source raw value are null (like above).

✅ Left Join Stream-Stream

streamsBuilder
  .stream()
  .selectKey()
  .leftJoin(streamTwo, (key, left, right) -> { ... // <----- Exception 💥});
  • Source raw key and source raw value from the left repartition topic.

Copy link
Copy Markdown
Member

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates @loicgreffier !

Here my feedback.

We are almost there.

Could you add tests (or maybe only additional assertions in existing tests are needed) for the cases you pointed out above?

*
* <p> If this method is invoked within a {@link Punctuator#punctuate(long)
* punctuation callback}, or while processing a record that was forwarded by a punctuation
* callback, it will return null.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
* callback, it will return null.
* callback, it will return {@code null}.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

*
* <p> If this method is invoked within a {@link Punctuator#punctuate(long)
* punctuation callback}, or while processing a record that was forwarded by a punctuation
* callback, it will return null.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
* callback, it will return null.
* callback, it will return {@code null}.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

*
* <p> If this method is invoked within a {@link Punctuator#punctuate(long)
* punctuation callback}, or while processing a record that was forwarded by a punctuation
* callback, it will return null.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
* callback, it will return null.
* callback, it will return {@code null}.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

*
* <p> If this method is invoked within a {@link Punctuator#punctuate(long)
* punctuation callback}, or while processing a record that was forwarded by a punctuation
* callback, it will return null.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
* callback, it will return null.
* callback, it will return {@code null}.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

Comment on lines +123 to +124
* <p> Always returns null if this method is invoked within a
* ProductionExceptionHandler.handle(ErrorHandlerContext, ProducerRecord, Exception)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this method ever called in ProductionExceptionHandler.handle()? Is this a copy&paste mistake from ErrorHandlerContext?

Copy link
Copy Markdown
Contributor Author

@loicgreffier loicgreffier Jun 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only ErrorHandlerContext is given to the ProductionExceptionHandler#handle, so this is a bad c&p. I've updated both code and KIP.

Comment on lines +140 to +141
* <p> Always returns null if this method is invoked within a
* ProductionExceptionHandler.handle(ErrorHandlerContext, ProducerRecord, Exception)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this method ever called in ProductionExceptionHandler.handle()? Is this a copy&paste mistake from ErrorHandlerContext?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

);

collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, sinkNodeName, context, streamPartitioner);
collector.flush();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you flush before the assertion?
Could you assert before flush?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flush was not necessary in the test, it has been removed.

@loicgreffier loicgreffier force-pushed the KAFKA-16505-RawKeyValue-Store-Cache branch 2 times, most recently from 813635e to cddb761 Compare June 2, 2025 22:18
@loicgreffier loicgreffier force-pushed the KAFKA-16505-RawKeyValue-Store-Cache branch from 03255d3 to 0bd6a7d Compare June 3, 2025 22:00
@loicgreffier
Copy link
Copy Markdown
Contributor Author

@cadonna

  • I've mainly updated ProcessingExceptionHandlerIntegrationTest.java to assert the presence of the source raw key and source raw value.
  • I’ve added a new parameterized test shouldVerifySourceRawKeyAndSourceRawValuePresentOrNotInErrorHandlerContext that verifies whether these fields are correctly set in the error handler context across a variety of topologies.

Please let me know if you think this approach is not the appropriate way to test it, or if you believe additional test cases should be included.

Copy link
Copy Markdown
Member

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@loicgreffier Thanks for the updates!

LGTM!

@cadonna
Copy link
Copy Markdown
Member

cadonna commented Jun 5, 2025

@loicgreffier Could you please a description to the PR and remove the template text?

@loicgreffier
Copy link
Copy Markdown
Contributor Author

@cadonna Done. I've updated the description.

@cadonna cadonna merged commit 3edb406 into apache:trunk Jun 5, 2025
35 checks passed
Mirai1129 pushed a commit to Mirai1129/kafka that referenced this pull request Jun 5, 2025
This PR is part of the KIP-1034.

It brings the support for the source raw key and the source raw
value in the `ErrorHandlerContext`. Required by the routing to DLQ implemented
by apache#17942.

Reviewers: Bruno Cadonna <cadonna@apache.org>

Co-authored-by: Damien Gasparina <d.gasparina@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants