Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
*/
package org.apache.kafka.streams.errors;


import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.streams.errors.internals.DefaultErrorHandlerContext;
import org.apache.kafka.streams.processor.ProcessorContext;

/**
Expand All @@ -37,11 +37,27 @@ public interface DeserializationExceptionHandler extends Configurable {
* @param context processor context
* @param record record that failed deserialization
* @param exception the actual exception
* @deprecated Since 3.9. Use Please {@link #handle(ErrorHandlerContext, ConsumerRecord, Exception)}
*/
@Deprecated
default DeserializationHandlerResponse handle(final ProcessorContext context,
final ConsumerRecord<byte[], byte[]> record,
final Exception exception) {
throw new UnsupportedOperationException();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit. KIP says NotImplementedException() -- we should update the KIP accordingly.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done KIP has been updated

}

/**
* Inspect a record and the exception received.
*
* @param context error handler context
* @param record record that failed deserialization
* @param exception the actual exception
*/
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
DeserializationHandlerResponse handle(final ProcessorContext context,
final ConsumerRecord<byte[], byte[]> record,
final Exception exception);
default DeserializationHandlerResponse handle(final ErrorHandlerContext context,
final ConsumerRecord<byte[], byte[]> record,
final Exception exception) {
return handle(((DefaultErrorHandlerContext) context).processorContext().orElse(null), record, exception);
}

/**
* Enumeration that describes the response from the exception handler.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,34 +89,6 @@ public interface ErrorHandlerContext {
*/
Headers headers();

/**
* Return the non-deserialized byte[] of the input message key if the context has been triggered by a message.
*
* <p> If this method is invoked within a {@link Punctuator#punctuate(long)
* punctuation callback}, or while processing a record that was forwarded by a punctuation
* callback, it will return {@code null}.
*
* <p> If this method is invoked in a sub-topology due to a repartition, the returned key would be one sent
* to the repartition topic.
*
* @return the raw byte of the key of the source message
*/
byte[] sourceRawKey();

/**
* Return the non-deserialized byte[] of the input message value if the context has been triggered by a message.
*
* <p> If this method is invoked within a {@link Punctuator#punctuate(long)
* punctuation callback}, or while processing a record that was forwarded by a punctuation
* callback, it will return {@code null}.
*
* <p> If this method is invoked in a sub-topology due to a repartition, the returned value would be one sent
* to the repartition topic.
*
* @return the raw byte of the value of the source message
*/
byte[] sourceRawValue();

/**
* Return the current processor node ID.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
public class LogAndContinueExceptionHandler implements DeserializationExceptionHandler {
private static final Logger log = LoggerFactory.getLogger(LogAndContinueExceptionHandler.class);

@Deprecated
@Override
public DeserializationHandlerResponse handle(final ProcessorContext context,
final ConsumerRecord<byte[], byte[]> record,
Expand All @@ -45,6 +46,19 @@ public DeserializationHandlerResponse handle(final ProcessorContext context,
return DeserializationHandlerResponse.CONTINUE;
}

@Override
public DeserializationHandlerResponse handle(final ErrorHandlerContext context,
final ConsumerRecord<byte[], byte[]> record,
final Exception exception) {

log.warn("Exception caught during Deserialization, " +
"taskId: {}, topic: {}, partition: {}, offset: {}",
context.taskId(), record.topic(), record.partition(), record.offset(),
exception);

return DeserializationHandlerResponse.CONTINUE;
}

@Override
public void configure(final Map<String, ?> configs) {
// ignore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class LogAndFailExceptionHandler implements DeserializationExceptionHandl
private static final Logger log = LoggerFactory.getLogger(LogAndFailExceptionHandler.class);

@Override
@Deprecated
public DeserializationHandlerResponse handle(final ProcessorContext context,
final ConsumerRecord<byte[], byte[]> record,
final Exception exception) {
Expand All @@ -45,6 +46,19 @@ public DeserializationHandlerResponse handle(final ProcessorContext context,
return DeserializationHandlerResponse.FAIL;
}

@Override
public DeserializationHandlerResponse handle(final ErrorHandlerContext context,
final ConsumerRecord<byte[], byte[]> record,
final Exception exception) {

log.error("Exception caught during Deserialization, " +
"taskId: {}, topic: {}, partition: {}, offset: {}",
context.taskId(), record.topic(), record.partition(), record.offset(),
exception);

return DeserializationHandlerResponse.FAIL;
}

@Override
public void configure(final Map<String, ?> configs) {
// ignore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@

import org.apache.kafka.common.header.Headers;
import org.apache.kafka.streams.errors.ErrorHandlerContext;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.TaskId;

import java.util.Optional;

/**
* Default implementation of {@link ErrorHandlerContext} that provides access to the metadata of the record that caused the error.
*/
Expand All @@ -28,27 +31,24 @@ public class DefaultErrorHandlerContext implements ErrorHandlerContext {
private final int partition;
private final long offset;
private final Headers headers;
private final byte[] sourceRawKey;
private final byte[] sourceRawValue;
private final String processorNodeId;
private final TaskId taskId;
private ProcessorContext processorContext;

public DefaultErrorHandlerContext(final String topic,
public DefaultErrorHandlerContext(final ProcessorContext processorContext,
final String topic,
final int partition,
final long offset,
final Headers headers,
final byte[] sourceRawKey,
final byte[] sourceRawValue,
final String processorNodeId,
final TaskId taskId) {
this.topic = topic;
this.partition = partition;
this.offset = offset;
this.headers = headers;
this.sourceRawKey = sourceRawKey;
this.sourceRawValue = sourceRawValue;
this.processorNodeId = processorNodeId;
this.taskId = taskId;
this.processorContext = processorContext;
}

@Override
Expand All @@ -71,16 +71,6 @@ public Headers headers() {
return headers;
}

@Override
public byte[] sourceRawKey() {
return sourceRawKey;
}

@Override
public byte[] sourceRawValue() {
return sourceRawValue;
}

@Override
public String processorNodeId() {
return processorNodeId;
Expand All @@ -90,4 +80,8 @@ public String processorNodeId() {
public TaskId taskId() {
return taskId;
}

public Optional<ProcessorContext> processorContext() {
return Optional.ofNullable(processorContext);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
public class CorruptedRecord extends StampedRecord {

CorruptedRecord(final ConsumerRecord<byte[], byte[]> rawRecord) {
super(rawRecord, ConsumerRecord.NO_TIMESTAMP, rawRecord);
super(rawRecord, ConsumerRecord.NO_TIMESTAMP);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,8 +307,7 @@ private void reprocessState(final List<TopicPartition> topicPartitions,
record.offset(),
record.partition(),
record.topic(),
record.headers(),
record);
record.headers());
globalProcessorContext.setRecordContext(recordContext);

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,7 @@ public void update(final ConsumerRecord<byte[], byte[]> record) {
deserialized.offset(),
deserialized.partition(),
deserialized.topic(),
deserialized.headers(),
record);
deserialized.headers());
processorContext.setRecordContext(recordContext);
processorContext.setCurrentNode(sourceNodeAndDeserializer.sourceNode());
final Record<Object, Object> toProcess = new Record<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@ public void process(final Record<KIn, VIn> record) {
context.offset(),
context.partition(),
context.topic(),
record.headers(),
processorRecordContext.rawRecord()
record.headers()
));
delegate.process(record.key(), record.value());
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,7 @@ public <K, V> void forward(final Record<K, V> record, final String childName) {
recordContext.offset(),
recordContext.partition(),
recordContext.topic(),
record.headers(),
recordContext.rawRecord());
record.headers());
}

if (childName == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,12 +205,11 @@ public void process(final Record<KIn, VIn> record) {
throw e;
} catch (final Exception e) {
final ErrorHandlerContext errorHandlerContext = new DefaultErrorHandlerContext(
null,
internalProcessorContext.topic(),
internalProcessorContext.partition(),
internalProcessorContext.offset(),
internalProcessorContext.headers(),
internalProcessorContext.recordContext().rawRecord().key(),
internalProcessorContext.recordContext().rawRecord().value(),
internalProcessorContext.currentNode().name(),
internalProcessorContext.taskId());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.processor.internals;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
Expand All @@ -38,28 +37,17 @@ public class ProcessorRecordContext implements RecordContext, RecordMetadata {
private final String topic;
private final int partition;
private final Headers headers;
private final ConsumerRecord<byte[], byte[]> rawRecord;

public ProcessorRecordContext(final long timestamp,
final long offset,
final int partition,
final String topic,
final Headers headers) {
this(timestamp, offset, partition, topic, headers, null);
}

public ProcessorRecordContext(final long timestamp,
final long offset,
final int partition,
final String topic,
final Headers headers,
final ConsumerRecord<byte[], byte[]> rawRecord) {
this.timestamp = timestamp;
this.offset = offset;
this.topic = topic;
this.partition = partition;
this.headers = Objects.requireNonNull(headers);
this.rawRecord = rawRecord;
}

@Override
Expand Down Expand Up @@ -87,10 +75,6 @@ public Headers headers() {
return headers;
}

public ConsumerRecord<byte[], byte[]> rawRecord() {
return rawRecord;
}

public long residentMemorySizeEstimate() {
long size = 0;
size += Long.BYTES; // value.context.timestamp
Expand Down Expand Up @@ -189,7 +173,7 @@ public static ProcessorRecordContext deserialize(final ByteBuffer buffer) {
headers = new RecordHeaders(headerArr);
}

return new ProcessorRecordContext(timestamp, offset, partition, topic, headers, null);
return new ProcessorRecordContext(timestamp, offset, partition, topic, headers);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.internals.DefaultErrorHandlerContext;
import org.apache.kafka.streams.processor.api.ProcessorContext;

import org.slf4j.Logger;
Expand Down Expand Up @@ -69,7 +70,7 @@ ConsumerRecord<Object, Object> deserialize(final ProcessorContext<?, ?> processo
Optional.empty()
);
} catch (final Exception deserializationException) {
handleDeserializationFailure(deserializationExceptionHandler, processorContext, deserializationException, rawRecord, log, droppedRecordsSensor);
handleDeserializationFailure(deserializationExceptionHandler, processorContext, deserializationException, rawRecord, log, droppedRecordsSensor, sourceNode().name());
return null; // 'handleDeserializationFailure' would either throw or swallow -- if we swallow we need to skip the record by returning 'null'
}
}
Expand All @@ -80,12 +81,27 @@ public static void handleDeserializationFailure(final DeserializationExceptionHa
final ConsumerRecord<byte[], byte[]> rawRecord,
final Logger log,
final Sensor droppedRecordsSensor) {
handleDeserializationFailure(deserializationExceptionHandler, processorContext, deserializationException, rawRecord, log, droppedRecordsSensor, null);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we still need this one? Can't we update GlobalStateManagerImpl to also pass in a sourceNode.name ?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sebastienviale What about this comments? Merge the PR for now to make progress.

Copy link
Copy Markdown
Contributor

@loicgreffier loicgreffier Jul 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mjsax I've been looking for a way to update GlobalStateManagerImpl so we can remove this method. But it looks like we have no source node name in GlobalStateManagerImpl. There's a "source" processor:

But it doesn't look to be named. E.g., in the following scenario:

streamsBuilder
    .addGlobalStore(storeBuilder, "PERSON_TOPIC",
        Consumed.with(Serdes.String(), SerdesUtils.specificAvroValueSerdes()),
        () -> (Processor<String, SpecificRecord, Void, Void>) record -> log.info("Processing record with key = {}, value = {}", record.key(), record.value())); 
// Source var is this processor ⬆️

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, the "helper" method also only calls the second one, setting sourceNodeName to null. Why can't we just call the second one from GlobalStateManagerImpl directly also setting it to null, and thus avoiding the need for the overload? The overload does not really add much value, or does it?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mjsax Addressing it in #16746

}

public static void handleDeserializationFailure(final DeserializationExceptionHandler deserializationExceptionHandler,
final ProcessorContext<?, ?> processorContext,
final Exception deserializationException,
final ConsumerRecord<byte[], byte[]> rawRecord,
final Logger log,
final Sensor droppedRecordsSensor,
final String sourceNodeName) {
final DeserializationExceptionHandler.DeserializationHandlerResponse response;
try {
response = deserializationExceptionHandler.handle(
final DefaultErrorHandlerContext errorHandlerContext = new DefaultErrorHandlerContext(
(InternalProcessorContext<?, ?>) processorContext,
rawRecord,
deserializationException);
rawRecord.topic(),
rawRecord.partition(),
rawRecord.offset(),
rawRecord.headers(),
sourceNodeName,
processorContext.taskId());
response = deserializationExceptionHandler.handle(errorHandlerContext, rawRecord, deserializationException);
} catch (final Exception fatalUserException) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realized now that you did not consider that the exception handler can also throw an exception for the processing exception handler (and I as the reviewer also missed it). Could you open a separate PR that catches exceptions from the processing exception handler and re-throws them as it is done here?
Sorry for not noticing it earlier!

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cadonna As processing exception handler handles exception at ProcessorNode, indeed exceptions during deserialization are not caught right now. Is it your point?

Maybe we should consider catching and handling exceptions when getting the nextRecord at StreamTask level right before processing it:

record = partitionGroup.nextRecord(recordInfo, wallClockTime);

Going to sleep on it

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think, we misunderstood each other. On this line an exception originating from the user-specified handler is caught. The exception can be anything that is thrown by user code. We did not consider exceptions thrown from user code for the processing exception handler.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done in this PR #16675

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done in this PR #16300 for Punctuating part

log.error(
"Deserialization error callback failed after deserialization error for record {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ private void updateHead() {
droppedRecordsSensor.record();
continue;
}
headRecord = new StampedRecord(deserialized, timestamp, raw);
headRecord = new StampedRecord(deserialized, timestamp);
headRecordSizeInBytes = consumerRecordSizeInBytes(raw);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,7 @@ public void process(final Record<KIn, VIn> record) {
context.offset(),
context.partition(),
context.topic(),
record.headers(),
context.recordContext().rawRecord()
record.headers()
);

final String topic = topicExtractor.extract(key, value, contextForExtraction);
Expand Down
Loading