Skip to content

KAFKA-16505: Adding dead letter queue in Kafka Streams#17942

Merged
lucasbru merged 16 commits intoapache:trunkfrom
Dabz:KAFKA-16505-Dead-letter-queue-in-Kafka-Streams
Jul 21, 2025
Merged

KAFKA-16505: Adding dead letter queue in Kafka Streams#17942
lucasbru merged 16 commits intoapache:trunkfrom
Dabz:KAFKA-16505-Dead-letter-queue-in-Kafka-Streams

Conversation

@Dabz
Copy link
Copy Markdown
Contributor

@Dabz Dabz commented Nov 25, 2024

First Pull Request to implement KIP-1034 to add support of Dead Letter
Queue in Kafka Streams. A second PR will store the source record key and
value byte[] in the context to add it to the DLQ records.

Reviewers: Lucas Brutschy lbrutschy@confluent.io, Bruno Cadonna
cadonna@apache.org

@Dabz Dabz force-pushed the KAFKA-16505-Dead-letter-queue-in-Kafka-Streams branch from 4fc7717 to 5698e60 Compare November 25, 2024 16:15
Copy link
Copy Markdown
Member

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR, @Dabz !

I started to review it, but did not finish since I need to think about how this KIP uses mutable enums which I guess is an anti-pattern.
Nevertheless, I wanted to give my first comments.

public static final String ENABLE_METRICS_PUSH_DOC = "Whether to enable pushing of internal client metrics for (main, restore, and global) consumers, producers, and admin clients." +
" The cluster must have a client metrics subscription which corresponds to a client.";

public static final String ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG = "errors.dead.letter.queue.topic.name";
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add unit tests?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done :)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You cannot add a class to the public API that was not mentioned in the KIP.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I renamed this class to ExceptionHandlerUtils, make it packaged protected, and made all methods statics to avoid inheritance.

public static final String HEADER_ERRORS_OFFSET_NAME = "__streams.errors.offset";


public boolean shouldBuildDeadLetterQueueRecord() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this public? It is only used within this class.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed it to package protected

return this;
}

public List<ProducerRecord<byte[], byte[]>> drainDeadLetterQueueRecords() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the KIP this should be named deadLetterQueueReords().

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I renamed it

/**
* {@code CommonExceptionHandler} Contains utilities method that could be used by all exception handlers
*/
public class CommonExceptionHandler implements Configurable {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better to define this class as a immutable utility class in the internal package that provides some utility methods to the default exception handlers. That means, no inheritance.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@lucasbru
Copy link
Copy Markdown
Member

lucasbru commented Dec 3, 2024

Indeed - using a single collection of dead letter records here that will shared across all stream threads doesn't seem like the right approach to me. Was this intended?

I'm not sure if there is a way to replace the enum by a class of some kind where we can properly define a non-static collection of records and remain binary compatible.

It seems to me that this needs a new design proposal and an updated KIP?

@cadonna
Copy link
Copy Markdown
Member

cadonna commented Dec 3, 2024

An option to fix this might be to add a new method deadLetterQueueRecords() to the handler interface that returns a list of records to add to the dead letter queue. By default the list is empty.

Copy link
Copy Markdown
Member

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates @sebastienviale !

I did a first pass. Here my comments.

Comment on lines +62 to +67
return handle(record, exception);
throw new UnsupportedOperationException();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this still call the other deprecated method?
Imagine a user implemented

handle(final ProducerRecord<byte[], byte[]> record, final Exception exception)

but not

ProductionExceptionHandlerResponse handle(final ErrorHandlerContext context, 
                                          final ProducerRecord<byte[], byte[]> record,
                                          final Exception exception)

Streams would throw an UnsupportedOperationException although it did not before upgrading to this version.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it would be beneficial to add some unit tests that verify this redirection. With such unit tests, this removal would had happened without some thoughts about why the test failed.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indeed, I added a Unit Test that failed if an UnsupportedOperationException is thrown

/**
* Represents the result of handling a production exception.
* <p>
* The {@code Response} class encapsulates a {@link ProductionExceptionHandlerResponse},
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* The {@code Response} class encapsulates a {@link ProductionExceptionHandlerResponse},
* The {@code Response} class encapsulates a {@link Result},

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

* {@link ProducerRecord} instances to be sent to a dead letter queue.
* </p>
*/
class Response {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add some unit tests for this class?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

final ErrorHandlerContext context,
final Exception e) {
if (deadLetterQueueTopicName == null) {
throw new InvalidConfigurationException(String.format("%s can not be null while building DeadLetterQueue record", StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
throw new InvalidConfigurationException(String.format("%s can not be null while building DeadLetterQueue record", StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG));
throw new InvalidConfigurationException(String.format("%s cannot be null while building dead letter queue record", StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Comment on lines +74 to +78
static ProducerRecord<byte[], byte[]> buildDeadLetterQueueRecord(final String deadLetterQueueTopicName,
final byte[] key,
final byte[] value,
final ErrorHandlerContext context,
final Exception e) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
static ProducerRecord<byte[], byte[]> buildDeadLetterQueueRecord(final String deadLetterQueueTopicName,
final byte[] key,
final byte[] value,
final ErrorHandlerContext context,
final Exception e) {
static ProducerRecord<byte[], byte[]> buildDeadLetterQueueRecord(final String deadLetterQueueTopicName,
final byte[] key,
final byte[] value,
final ErrorHandlerContext context,
final Exception e) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@SuppressWarnings("deprecation")
@Deprecated
@Override
public DeserializationHandlerResponse handle(final ErrorHandlerContext context,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can remove those deprecated handler methods. They are not called anywhere in the Streams code.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@Deprecated
@Override
public ProcessingHandlerResponse handle(final ErrorHandlerContext context, final Record<?, ?> record, final Exception exception) {
public ProcessingHandlerResponse handle(final ErrorHandlerContext context,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can remove this deprecated handler method. It is not called anywhere in the Streams code.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

public Response handleError(final ErrorHandlerContext context,
final ConsumerRecord<byte[], byte[]> record,
final Exception exception) {
log.warn(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be log.error(). It is failing, not resuming.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@SuppressWarnings("deprecation")
@Deprecated
@Override
public DeserializationHandlerResponse handle(final ErrorHandlerContext context,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can remove those deprecated handler methods. They are not called anywhere in the Streams code.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Comment on lines +39 to +40
public ProcessingHandlerResponse handle(final ErrorHandlerContext context,
final Record<?, ?> record, final Exception exception) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can remove this deprecated handler method. It is not called anywhere in the Streams code.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@sebastienviale sebastienviale force-pushed the KAFKA-16505-Dead-letter-queue-in-Kafka-Streams branch from 93cd94d to 93c831f Compare January 17, 2025 09:27
@sebastienviale
Copy link
Copy Markdown
Contributor

Thanks for the updates @sebastienviale !

I did a first pass. Here my comments.

@cadonna thanks for your review, I pushed changes

@sebastienviale sebastienviale force-pushed the KAFKA-16505-Dead-letter-queue-in-Kafka-Streams branch from fc4ac3c to efa1cae Compare February 25, 2025 12:36
@sebastienviale sebastienviale force-pushed the KAFKA-16505-Dead-letter-queue-in-Kafka-Streams branch from efa1cae to 739f443 Compare March 24, 2025 13:04
cadonna pushed a commit that referenced this pull request Jun 5, 2025
This PR is part of the KIP-1034.

It brings the support for the source raw key and the source raw
value in the `ErrorHandlerContext`. Required by the routing to DLQ implemented
by #17942.

Reviewers: Bruno Cadonna <cadonna@apache.org>

Co-authored-by: Damien Gasparina <d.gasparina@gmail.com>
@sebastienviale sebastienviale force-pushed the KAFKA-16505-Dead-letter-queue-in-Kafka-Streams branch from 22a8d80 to 2c32c96 Compare June 5, 2025 11:59
Mirai1129 pushed a commit to Mirai1129/kafka that referenced this pull request Jun 5, 2025
This PR is part of the KIP-1034.

It brings the support for the source raw key and the source raw
value in the `ErrorHandlerContext`. Required by the routing to DLQ implemented
by apache#17942.

Reviewers: Bruno Cadonna <cadonna@apache.org>

Co-authored-by: Damien Gasparina <d.gasparina@gmail.com>
@lucasbru lucasbru requested review from Copilot and lucasbru July 7, 2025 16:28
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

Adds support for Dead Letter Queue (DLQ) in Kafka Streams by extending exception handler APIs and wiring DLQ record production throughout the core processing, deserialization, and production paths.

  • Introduced a new Response API (with deadLetterQueueRecords) for deserialization, processing, and production exception handlers.
  • Updated core classes (StreamTask, RecordDeserializer, RecordCollector, ProcessorNode) to emit DLQ records when configured.
  • Added ExceptionHandlerUtils for building DLQ records with appropriate headers and comprehensive tests validating DLQ behavior.

Reviewed Changes

Copilot reviewed 24 out of 24 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
streams/src/test/java/org/apache/kafka/test/MockRecordCollector.java Added overload of send to collect serialized records for DLQ.
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java Renamed handler methods, switched to Response type.
streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java Switched to new Response API and added DLQ tests.
streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java Switched to new Response API and added DLQ tests.
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java Switched to new Response API and added DLQ tests.
streams/src/test/java/org/apache/kafka/streams/errors/ExceptionHandlerUtilsTest.java New tests for DLQ record headers.
streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java Added test for default DLQ config.
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java Integrated DLQ sends into processing error flow.
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java Integrated DLQ sends into deserialization error flow.
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java Integrated DLQ sends into production and serialization flows.
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java Added new send signature for DLQ.
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java Integrated DLQ sends into processing node error flow.
streams/src/main/java/org/apache/kafka/streams/errors/internals/ExceptionHandlerUtils.java New utility for building DLQ records with headers.
streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java Introduced Response, deprecated old API, added DLQ support.
streams/src/main/java/org/apache/kafka/streams/errors/ProcessingExceptionHandler.java Introduced Response, deprecated old API, added DLQ support.
streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailProcessingExceptionHandler.java Added DLQ topic configuration and records.
streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueProcessingExceptionHandler.java Added DLQ topic configuration and records.
streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java Added DLQ topic configuration and records.
streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java Added DLQ topic configuration and records.
streams/src/main/java/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.java Added DLQ topic configuration and records.
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java Defined new errors.dead.letter.queue.topic.name config.
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SwallowUnknownTopicErrorIntegrationTest.java Switched to new Response API.
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java Switched to new Response API.
Comments suppressed due to low confidence (1)

streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java:1619

  • This test method is missing the @test annotation, so it will not be executed by the test runner.
    public void shouldSetDefaultDeadLetterQueue() {

Copy link
Copy Markdown
Member

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR! I left a few questions for my own understanding, and a few nits. Mostly looking good to me, though!

Comment thread streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java Outdated
@sebastienviale sebastienviale force-pushed the KAFKA-16505-Dead-letter-queue-in-Kafka-Streams branch from c1c6eea to f1b8286 Compare July 17, 2025 07:49
@lucasbru lucasbru changed the title Kafka-16505: Adding dead letter queue in Kafka Streams KAFKA-16505: Adding dead letter queue in Kafka Streams Jul 18, 2025
Copy link
Copy Markdown
Member

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@cadonna would you like to take another look?

@Dabz @sebastienviale I wonder if we could have a little integration tests for this feature, to test that it actually works end-to-end?

@cadonna
Copy link
Copy Markdown
Member

cadonna commented Jul 21, 2025

@cadonna would you like to take another look?

@lucasbru I trust you completely. 🙂

@lucasbru
Copy link
Copy Markdown
Member

I restarted the CI since it was triggering a flaky test.

I created https://issues.apache.org/jira/browse/KAFKA-16505 for tracking the integration test.

@lucasbru lucasbru merged commit cdc2d95 into apache:trunk Jul 21, 2025
36 of 38 checks passed
k-apol pushed a commit to k-apol/kafka that referenced this pull request Aug 8, 2025
Implements KIP-1034 to add support of Dead Letter
Queue in Kafka Streams. 

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Bruno Cadonna
 <cadonna@apache.org>
Co-authored-by: Sebastien Viale <sebastien.viale@michelin.com>
e.printStackTrace(stackTracePrintWriter);

try (final StringSerializer stringSerializer = new StringSerializer()) {
producerRecord.headers().add(HEADER_ERRORS_EXCEPTION_NAME, stringSerializer.serialize(null, e.toString()));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KIP-1034 states that this filed is for the "Name of the thrown exception", so we should use e.getClass().getName() instead, right?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, shouldn't we include the headers from the origin record in the DLQ record? I noticed KIP-1034 states "Existing context headers are automatically forwarded into the new DLQ record"

Copy link
Copy Markdown
Member

@chia7712 chia7712 Jan 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

open #21370

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you are right there is a mismatch between the KIP and the implementation here.

lucasbru pushed a commit that referenced this pull request Jan 30, 2026
…21370)

This patch fixed two mismatch between

[KIP-1034](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams#KIP1034:DeadletterqueueinKafkaStreams-ProposedChanges)
and the actual implementation:

- Incorrect value of `HEADER_ERRORS_EXCEPTION_NAME`, it should be the
name of the thrown exception, not exception's String representation.
- Original headers from record that causes exception should be added to
dlq record headers.

References:
- #17942 (comment)
- #17942 (comment)
-

[KIP-1034](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams#KIP1034:DeadletterqueueinKafkaStreams-ProposedChanges)

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
unknowntpo added a commit to unknowntpo/kafka that referenced this pull request Jan 31, 2026
…pache#21370)

This patch fixed two mismatch between

[KIP-1034](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams#KIP1034:DeadletterqueueinKafkaStreams-ProposedChanges)
and the actual implementation:

- Incorrect value of `HEADER_ERRORS_EXCEPTION_NAME`, it should be the
name of the thrown exception, not exception's String representation.
- Original headers from record that causes exception should be added to
dlq record headers.

References:
- apache#17942 (comment)
- apache#17942 (comment)
-

[KIP-1034](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams#KIP1034:DeadletterqueueinKafkaStreams-ProposedChanges)

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
unknowntpo added a commit to unknowntpo/kafka that referenced this pull request Jan 31, 2026
…pache#21370)

This patch fixed two mismatch between

[KIP-1034](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams#KIP1034:DeadletterqueueinKafkaStreams-ProposedChanges)
and the actual implementation:

- Incorrect value of `HEADER_ERRORS_EXCEPTION_NAME`, it should be the
name of the thrown exception, not exception's String representation.
- Original headers from record that causes exception should be added to
dlq record headers.

References:
- apache#17942 (comment)
- apache#17942 (comment)
-

[KIP-1034](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams#KIP1034:DeadletterqueueinKafkaStreams-ProposedChanges)

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
chia7712 pushed a commit that referenced this pull request Jan 31, 2026
…21378)

This is a backport of PR #21370, with an additional checkstyle fix
(adding a `final` keyword to make checkstyle works in 4.2 branch).

---

This patch fixed two mismatch between


[KIP-1034](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams#KIP1034:DeadletterqueueinKafkaStreams-ProposedChanges)
and the actual implementation:

- Incorrect value of `HEADER_ERRORS_EXCEPTION_NAME`, it should be the
name of the thrown exception, not exception's String representation.
- Original headers from record that causes exception should be added to
dlq record headers.

References:
- #17942 (comment)
- #17942 (comment) 
-
[KIP-1034](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams#KIP1034:DeadletterqueueinKafkaStreams-ProposedChanges)

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Chia-Ping Tsai
<chia7712@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants