Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class LogAndFailProcessingExceptionHandler implements ProcessingException

@Override
public ProcessingHandlerResponse handle(final ErrorHandlerContext context, final Record<?, ?> record, final Exception exception) {
log.warn("Exception caught during message processing, " +
log.error("Exception caught during message processing, " +
"processor node: {}, taskId: {}, source topic: {}, source partition: {}, source offset: {}",
context.processorNodeId(), context.taskId(), context.topic(), context.partition(), context.offset(),
exception);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,41 +53,41 @@ public DefaultErrorHandlerContext(final String topic,

@Override
public String topic() {
return this.topic;
return topic;
}

@Override
public int partition() {
return this.partition;
return partition;
}

@Override
public long offset() {
return this.offset;
return offset;
}

@Override
public Headers headers() {
return this.headers;
return headers;
}

@Override
public byte[] sourceRawKey() {
return this.sourceRawKey;
return sourceRawKey;
}

@Override
public byte[] sourceRawValue() {
return this.sourceRawValue;
return sourceRawValue;
}

@Override
public String processorNodeId() {
return this.processorNodeId;
return processorNodeId;
}

@Override
public TaskId taskId() {
return this.taskId;
return taskId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,16 @@
*/
package org.apache.kafka.streams.errors.internals;

import org.apache.kafka.common.KafkaException;
import org.apache.kafka.streams.errors.StreamsException;

/**
* {@link FailedProcessingException} is the top-level exception type generated by Kafka Streams, and indicates errors have
* occurred during a {@link org.apache.kafka.streams.processor.internals.ProcessorNode ProcessorNode's} processing.
*/
public class FailedProcessingException extends KafkaException {
public class FailedProcessingException extends StreamsException {
private static final long serialVersionUID = 1L;

public FailedProcessingException(final Throwable throwable) {
super(throwable);
public FailedProcessingException(final Exception exception) {
super(exception);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -800,35 +800,39 @@ record = null;
record = null;
throw new TaskCorruptedException(Collections.singleton(id));
}
} catch (final FailedProcessingException failedProcessingException) {
// Do not keep the failed processing exception in the stack trace
handleException(failedProcessingException.getCause());
} catch (final StreamsException exception) {
record = null;
throw exception;
} catch (final RuntimeException e) {
// Do not keep the failed processing exception in the stack trace
final Throwable processingException = e instanceof FailedProcessingException ? e.getCause() : e;

final StreamsException error = new StreamsException(
String.format(
"Exception caught in process. taskId=%s, processor=%s, topic=%s, partition=%d, offset=%d, stacktrace=%s",
id(),
processorContext.currentNode().name(),
record.topic(),
record.partition(),
record.offset(),
getStacktraceString(processingException)
),
processingException
);
record = null;

throw error;
} catch (final Exception e) {
handleException(e);
} finally {
processorContext.setCurrentNode(null);
}

return true;
}

private void handleException(final Throwable e) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
private void handleException(final Throwable e) {
private void handleException(final Exception e) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sebastienviale found that failedProcessingException.getCause() returns a Throwable. I think casting would be overhead. Let's keep private void handleException(final Throwable e)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you change private void handleException(final Throwable e) to private void handleException(final Exception e)? The additional wrapping into an Exception is not worth changing the signature. Please revert to private void handleException(final Throwable e) and pass in failedProcessingException.getCause().
See my comment above.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reverted. Probably a mistake in a recent commit

final StreamsException error = new StreamsException(
String.format(
"Exception caught in process. taskId=%s, processor=%s, topic=%s, partition=%d, offset=%d, stacktrace=%s",
id(),
processorContext.currentNode().name(),
record.topic(),
record.partition(),
record.offset(),
getStacktraceString(e)
),
e
);
record = null;

throw error;
}

@SuppressWarnings("unchecked")
private void doProcess(final long wallClockTime) {
// process the record by passing to the source node of the topology
Expand Down