From 11c2573044abd09ea99e20b2e35aed0a4fca401e Mon Sep 17 00:00:00 2001 From: Sebastien Viale Date: Tue, 23 Jul 2024 09:34:49 +0200 Subject: [PATCH 1/3] KFK-16448: Fix processing exception handler Co-authored-by: Dabz Co-authored-by: loicgreffier --- .../LogAndFailProcessingExceptionHandler.java | 2 +- .../internals/DefaultErrorHandlerContext.java | 16 ++++---- .../internals/FailedProcessingException.java | 8 ++-- .../processor/internals/StreamTask.java | 40 ++++++++++--------- 4 files changed, 35 insertions(+), 31 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailProcessingExceptionHandler.java b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailProcessingExceptionHandler.java index 47fdb09c9c207..9c2cf91c605c6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailProcessingExceptionHandler.java +++ b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailProcessingExceptionHandler.java @@ -32,7 +32,7 @@ public class LogAndFailProcessingExceptionHandler implements ProcessingException @Override public ProcessingHandlerResponse handle(final ErrorHandlerContext context, final Record record, final Exception exception) { - log.warn("Exception caught during message processing, " + + log.error("Exception caught during message processing, " + "processor node: {}, taskId: {}, source topic: {}, source partition: {}, source offset: {}", context.processorNodeId(), context.taskId(), context.topic(), context.partition(), context.offset(), exception); diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/internals/DefaultErrorHandlerContext.java b/streams/src/main/java/org/apache/kafka/streams/errors/internals/DefaultErrorHandlerContext.java index fc6b6048cb902..ff79860d77e30 100644 --- a/streams/src/main/java/org/apache/kafka/streams/errors/internals/DefaultErrorHandlerContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/errors/internals/DefaultErrorHandlerContext.java @@ -53,41 +53,41 @@ public DefaultErrorHandlerContext(final String topic, @Override public String topic() { - return this.topic; + return topic; } @Override public int partition() { - return this.partition; + return partition; } @Override public long offset() { - return this.offset; + return offset; } @Override public Headers headers() { - return this.headers; + return headers; } @Override public byte[] sourceRawKey() { - return this.sourceRawKey; + return sourceRawKey; } @Override public byte[] sourceRawValue() { - return this.sourceRawValue; + return sourceRawValue; } @Override public String processorNodeId() { - return this.processorNodeId; + return processorNodeId; } @Override public TaskId taskId() { - return this.taskId; + return taskId; } } diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/internals/FailedProcessingException.java b/streams/src/main/java/org/apache/kafka/streams/errors/internals/FailedProcessingException.java index 81b2a2d4fb1fb..25f2ae9f6cc09 100644 --- a/streams/src/main/java/org/apache/kafka/streams/errors/internals/FailedProcessingException.java +++ b/streams/src/main/java/org/apache/kafka/streams/errors/internals/FailedProcessingException.java @@ -16,16 +16,16 @@ */ package org.apache.kafka.streams.errors.internals; -import org.apache.kafka.common.KafkaException; +import org.apache.kafka.streams.errors.StreamsException; /** * {@link FailedProcessingException} is the top-level exception type generated by Kafka Streams, and indicates errors have * occurred during a {@link org.apache.kafka.streams.processor.internals.ProcessorNode ProcessorNode's} processing. */ -public class FailedProcessingException extends KafkaException { +public class FailedProcessingException extends StreamsException { private static final long serialVersionUID = 1L; - public FailedProcessingException(final Throwable throwable) { - super(throwable); + public FailedProcessingException(final Exception exception) { + super(exception); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 52d40a34915c7..7b5a35a201804 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -800,28 +800,14 @@ record = null; record = null; throw new TaskCorruptedException(Collections.singleton(id)); } + } catch (final FailedProcessingException failedProcessingException) { + // Do not keep the failed processing exception in the stack trace + handleException(new Exception(failedProcessingException.getCause())); } catch (final StreamsException exception) { record = null; throw exception; } catch (final RuntimeException e) { - // Do not keep the failed processing exception in the stack trace - final Throwable processingException = e instanceof FailedProcessingException ? e.getCause() : e; - - final StreamsException error = new StreamsException( - String.format( - "Exception caught in process. taskId=%s, processor=%s, topic=%s, partition=%d, offset=%d, stacktrace=%s", - id(), - processorContext.currentNode().name(), - record.topic(), - record.partition(), - record.offset(), - getStacktraceString(processingException) - ), - processingException - ); - record = null; - - throw error; + handleException(e); } finally { processorContext.setCurrentNode(null); } @@ -829,6 +815,24 @@ record = null; return true; } + private void handleException(final Exception e) { + final StreamsException error = new StreamsException( + String.format( + "Exception caught in process. taskId=%s, processor=%s, topic=%s, partition=%d, offset=%d, stacktrace=%s", + id(), + processorContext.currentNode().name(), + record.topic(), + record.partition(), + record.offset(), + getStacktraceString(e) + ), + e + ); + record = null; + + throw error; + } + @SuppressWarnings("unchecked") private void doProcess(final long wallClockTime) { // process the record by passing to the source node of the topology From e9611074e9a4456bddd11020f2ba0ca9cd4263bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Greffier?= Date: Wed, 24 Jul 2024 11:05:29 +0200 Subject: [PATCH 2/3] KFK-16448: Revert handleException parameter to Throwable Co-authored-by: Dabz Co-authored-by: loicgreffier --- .../apache/kafka/streams/processor/internals/StreamTask.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 7b5a35a201804..47229628e29b6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -802,7 +802,7 @@ record = null; } } catch (final FailedProcessingException failedProcessingException) { // Do not keep the failed processing exception in the stack trace - handleException(new Exception(failedProcessingException.getCause())); + handleException(failedProcessingException.getCause()); } catch (final StreamsException exception) { record = null; throw exception; @@ -815,7 +815,7 @@ record = null; return true; } - private void handleException(final Exception e) { + private void handleException(final Throwable e) { final StreamsException error = new StreamsException( String.format( "Exception caught in process. taskId=%s, processor=%s, topic=%s, partition=%d, offset=%d, stacktrace=%s", From b73f7acf8c10b20121d0006c91d219c292eb51f4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Greffier?= Date: Wed, 24 Jul 2024 11:32:05 +0200 Subject: [PATCH 3/3] KFK-16448: Catch Exception instead of RuntimeException Co-authored-by: Dabz Co-authored-by: loicgreffier --- .../apache/kafka/streams/processor/internals/StreamTask.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 47229628e29b6..30b9038aa6a67 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -806,7 +806,7 @@ record = null; } catch (final StreamsException exception) { record = null; throw exception; - } catch (final RuntimeException e) { + } catch (final Exception e) { handleException(e); } finally { processorContext.setCurrentNode(null);