Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ public void shouldStopProcessingWhenProcessingExceptionHandlerReturnsNull() {
final StreamsException e = assertThrows(StreamsException.class, () -> inputTopic.pipeInput(eventError.key, eventError.value, Instant.EPOCH));
assertEquals("Fatal user code error in processing error callback", e.getMessage());
assertInstanceOf(NullPointerException.class, e.getCause());
assertEquals("Invalid ProductionExceptionHandler response.", e.getCause().getMessage());
assertEquals("Invalid ProcessingExceptionHandler response.", e.getCause().getMessage());
assertFalse(isExecuted.get());
}
}
Expand Down Expand Up @@ -524,15 +524,15 @@ public void shouldVerifySourceRawKeyAndSourceRawValuePresentOrNotInErrorHandlerC

public static class ContinueProcessingExceptionHandlerMockTest implements ProcessingExceptionHandler {
@Override
public ProcessingExceptionHandler.ProcessingHandlerResponse handle(final ErrorHandlerContext context, final Record<?, ?> record, final Exception exception) {
public Response handleError(final ErrorHandlerContext context, final Record<?, ?> record, final Exception exception) {
if (((String) record.key()).contains("FATAL")) {
throw new RuntimeException("KABOOM!");
}
if (((String) record.key()).contains("NULL")) {
return null;
}
assertProcessingExceptionHandlerInputs(context, record, exception);
return ProcessingExceptionHandler.ProcessingHandlerResponse.CONTINUE;
return Response.resume();
}

@Override
Expand All @@ -543,9 +543,9 @@ public void configure(final Map<String, ?> configs) {

public static class FailProcessingExceptionHandlerMockTest implements ProcessingExceptionHandler {
@Override
public ProcessingExceptionHandler.ProcessingHandlerResponse handle(final ErrorHandlerContext context, final Record<?, ?> record, final Exception exception) {
public Response handleError(final ErrorHandlerContext context, final Record<?, ?> record, final Exception exception) {
assertProcessingExceptionHandlerInputs(context, record, exception);
return ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL;
return Response.fail();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,15 +156,15 @@ public TestHandler() { }
public void configure(final Map<String, ?> configs) { }

@Override
public ProductionExceptionHandlerResponse handle(final ErrorHandlerContext context,
final ProducerRecord<byte[], byte[]> record,
final Exception exception) {
public Response handleError(final ErrorHandlerContext context,
final ProducerRecord<byte[], byte[]> record,
final Exception exception) {
if (exception instanceof TimeoutException &&
exception.getCause() != null &&
exception.getCause() instanceof UnknownTopicOrPartitionException) {
return ProductionExceptionHandlerResponse.CONTINUE;
return Response.resume();
}
return ProductionExceptionHandler.super.handle(context, record, exception);
return ProductionExceptionHandler.super.handleError(context, record, exception);
}
}

Expand Down
10 changes: 10 additions & 0 deletions streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,11 @@ public class StreamsConfig extends AbstractConfig {
"support \"classic\" or \"streams\". If \"streams\" is specified, then the streams rebalance protocol will be " +
"used. Otherwise, the classic group protocol will be used.";

public static final String ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG = "errors.dead.letter.queue.topic.name";
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add unit tests?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done :)


private static final String ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_DOC = "If not null, the default exception handler will build and send a Dead Letter Queue record to the topic with the provided name if an error occurs.\n" +
"If a custom deserialization/production or processing exception handler is set, this parameter is ignored for this handler.";

/** {@code log.summary.interval.ms} */
public static final String LOG_SUMMARY_INTERVAL_MS_CONFIG = "log.summary.interval.ms";
private static final String LOG_SUMMARY_INTERVAL_MS_DOC = "The output interval in milliseconds for logging summary information.\n" +
Expand Down Expand Up @@ -991,6 +996,11 @@ public class StreamsConfig extends AbstractConfig {
LogAndFailExceptionHandler.class.getName(),
Importance.MEDIUM,
DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC)
.define(ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG,
Type.STRING,
null,
Importance.MEDIUM,
ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_DOC)
.define(MAX_TASK_IDLE_MS_CONFIG,
Type.LONG,
0L,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,38 +18,42 @@

import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.streams.StreamsConfig;

import java.util.Map;

import static org.apache.kafka.streams.errors.internals.ExceptionHandlerUtils.maybeBuildDeadLetterQueueRecords;

/**
* {@code ProductionExceptionHandler} that always instructs streams to fail when an exception
* happens while attempting to produce result records.
*/
public class DefaultProductionExceptionHandler implements ProductionExceptionHandler {
/**
* @deprecated Since 3.9. Use {@link #handle(ErrorHandlerContext, ProducerRecord, Exception)} instead.
*/
@SuppressWarnings("deprecation")
@Deprecated

private String deadLetterQueueTopic = null;

@Override
public ProductionExceptionHandlerResponse handle(final ProducerRecord<byte[], byte[]> record,
final Exception exception) {
public Response handleError(final ErrorHandlerContext context,
final ProducerRecord<byte[], byte[]> record,
final Exception exception) {
return exception instanceof RetriableException ?
ProductionExceptionHandlerResponse.RETRY :
ProductionExceptionHandlerResponse.FAIL;
Response.retry() :
Response.fail(maybeBuildDeadLetterQueueRecords(deadLetterQueueTopic, context.sourceRawKey(), context.sourceRawValue(), context, exception));
}

@SuppressWarnings("rawtypes")
@Override
public ProductionExceptionHandlerResponse handle(final ErrorHandlerContext context,
final ProducerRecord<byte[], byte[]> record,
final Exception exception) {
return exception instanceof RetriableException ?
ProductionExceptionHandlerResponse.RETRY :
ProductionExceptionHandlerResponse.FAIL;
public Response handleSerializationError(final ErrorHandlerContext context,
final ProducerRecord record,
Comment thread
lucasbru marked this conversation as resolved.
final Exception exception,
final SerializationExceptionOrigin origin) {
return Response.fail(maybeBuildDeadLetterQueueRecords(deadLetterQueueTopic, context.sourceRawKey(), context.sourceRawValue(), context, exception));
}


@Override
public void configure(final Map<String, ?> configs) {
// ignore
if (configs.get(StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG) != null)
deadLetterQueueTopic = String.valueOf(configs.get(StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@
package org.apache.kafka.streams.errors;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.streams.errors.internals.DefaultErrorHandlerContext;
import org.apache.kafka.streams.processor.ProcessorContext;

import java.util.Collections;
import java.util.List;

/**
* Interface that specifies how an exception from source node deserialization
* (e.g., reading from Kafka) should be handled.
Expand Down Expand Up @@ -63,16 +67,35 @@ default DeserializationHandlerResponse handle(final ProcessorContext context,
* The actual exception.
*
* @return Whether to continue or stop processing.
*
* @deprecated Use {@link #handleError(ErrorHandlerContext, ConsumerRecord, Exception)} instead.
*/
@Deprecated
default DeserializationHandlerResponse handle(final ErrorHandlerContext context,
final ConsumerRecord<byte[], byte[]> record,
final Exception exception) {
return handle(((DefaultErrorHandlerContext) context).processorContext().orElse(null), record, exception);
}

/**
* Inspects a record and the exception received during deserialization.
*
* @param context
* Error handler context.
* @param record
* Record that failed deserialization.
* @param exception
* The actual exception.
*
* @return a {@link Response} object
*/
default Response handleError(final ErrorHandlerContext context, final ConsumerRecord<byte[], byte[]> record, final Exception exception) {
return new Response(Result.from(handle(context, record, exception)), Collections.emptyList());
}
/**
* Enumeration that describes the response from the exception handler.
*/
@Deprecated
enum DeserializationHandlerResponse {
/** Continue processing. */
CONTINUE(0, "CONTINUE"),
Expand All @@ -95,4 +118,137 @@ enum DeserializationHandlerResponse {
}
}

/**
* Enumeration that describes the response from the exception handler.
*/
enum Result {
/** Continue processing. */
RESUME(0, "RESUME"),
/** Fail processing. */
FAIL(1, "FAIL");

/**
* An english description for the used option. This is for debugging only and may change.
*/
public final String name;

/**
* The permanent and immutable id for the used option. This can't change ever.
*/
public final int id;

Result(final int id, final String name) {
this.id = id;
this.name = name;
}

/**
* Converts the deprecated enum DeserializationHandlerResponse into the new Result enum.
*
* @param value the old DeserializationHandlerResponse enum value
* @return a {@link Result} enum value
* @throws IllegalArgumentException if the provided value does not map to a valid {@link Result}
*/
private static DeserializationExceptionHandler.Result from(final DeserializationHandlerResponse value) {
switch (value) {
case FAIL:
return Result.FAIL;
case CONTINUE:
return Result.RESUME;
default:
throw new IllegalArgumentException("No Result enum found for old value: " + value);
}
}
}

/**
* Represents the result of handling a deserialization exception.
* <p>
* The {@code Response} class encapsulates a {@link Result},
* indicating whether processing should continue or fail, along with an optional list of
* {@link ProducerRecord} instances to be sent to a dead letter queue.
* </p>
*/
class Response {

private final Result result;

private final List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords;

/**
* Constructs a new {@code DeserializationExceptionResponse} object.
*
* @param result the result indicating whether processing should continue or fail;
* must not be {@code null}.
* @param deadLetterQueueRecords the list of records to be sent to the dead letter queue; may be {@code null}.
*/
private Response(final Result result,
final List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords) {
this.result = result;
this.deadLetterQueueRecords = deadLetterQueueRecords;
}

/**
* Creates a {@code Response} indicating that processing should fail.
*
* @param deadLetterQueueRecords the list of records to be sent to the dead letter queue; may be {@code null}.
* @return a {@code Response} with a {@link DeserializationExceptionHandler.Result#FAIL} status.
*/
public static Response fail(final List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords) {
return new Response(Result.FAIL, deadLetterQueueRecords);
}

/**
* Creates a {@code Response} indicating that processing should fail.
*
* @return a {@code Response} with a {@link DeserializationExceptionHandler.Result#FAIL} status.
*/
public static Response fail() {
return fail(Collections.emptyList());
}

/**
* Creates a {@code Response} indicating that processing should continue.
*
* @param deadLetterQueueRecords the list of records to be sent to the dead letter queue; may be {@code null}.
* @return a {@code Response} with a {@link DeserializationExceptionHandler.Result#RESUME} status.
*/
public static Response resume(final List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords) {
return new Response(Result.RESUME, deadLetterQueueRecords);
}

/**
* Creates a {@code Response} indicating that processing should continue.
*
* @return a {@code Response} with a {@link DeserializationExceptionHandler.Result#RESUME} status.
*/
public static Response resume() {
return resume(Collections.emptyList());
}

/**
* Retrieves the deserialization handler result.
*
* @return the {@link Result} indicating whether processing should continue or fail.
*/
public Result result() {
return result;
}

/**
* Retrieves an unmodifiable list of records to be sent to the dead letter queue.
* <p>
* If the list is {@code null}, an empty list is returned.
* </p>
*
* @return an unmodifiable list of {@link ProducerRecord} instances
* for the dead letter queue, or an empty list if no records are available.
*/
public List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords() {
if (deadLetterQueueRecords == null) {
return Collections.emptyList();
}
return Collections.unmodifiableList(deadLetterQueueRecords);
}
}
}
Loading