Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,20 @@
* happens while attempting to produce result records.
*/
public class DefaultProductionExceptionHandler implements ProductionExceptionHandler {
@Deprecated
@Override
public ProductionExceptionHandlerResponse handle(final ProducerRecord<byte[], byte[]> record,
final Exception exception) {
return ProductionExceptionHandlerResponse.FAIL;
}

@Override
public ProductionExceptionHandlerResponse handle(final ErrorHandlerContext context,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not think we can modify this without deprecating the old method and mentioning it in the KIP since it is part of the public API.
I am aware that this method is only called internally.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, we did it for the deserialization exception handler implementations. I'm wondering why we did not do the same for the production.. 😄

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cadonna I updated KIP-1033 with a word regarding deserialization and production handler implementations

final ProducerRecord<byte[], byte[]> record,
final Exception exception) {
return ProductionExceptionHandlerResponse.FAIL;
}

@Override
public void configure(final Map<String, ?> configs) {
// ignore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,34 +89,6 @@ public interface ErrorHandlerContext {
*/
Headers headers();

/**
* Return the non-deserialized byte[] of the input message key if the context has been triggered by a message.
*
* <p> If this method is invoked within a {@link Punctuator#punctuate(long)
* punctuation callback}, or while processing a record that was forwarded by a punctuation
* callback, it will return {@code null}.
*
* <p> If this method is invoked in a sub-topology due to a repartition, the returned key would be one sent
* to the repartition topic.
*
* @return the raw byte of the key of the source message
*/
byte[] sourceRawKey();

/**
* Return the non-deserialized byte[] of the input message value if the context has been triggered by a message.
*
* <p> If this method is invoked within a {@link Punctuator#punctuate(long)
* punctuation callback}, or while processing a record that was forwarded by a punctuation
* callback, it will return {@code null}.
*
* <p> If this method is invoked in a sub-topology due to a repartition, the returned value would be one sent
* to the repartition topic.
*
* @return the raw byte of the value of the source message
*/
byte[] sourceRawValue();

/**
* Return the current processor node ID.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,60 @@ public interface ProductionExceptionHandler extends Configurable {
*
* @param record The record that failed to produce
* @param exception The exception that occurred during production
* @deprecated Since 3.9. Use {@link #handle(ErrorHandlerContext, ProducerRecord, Exception)} instead.
*/
ProductionExceptionHandlerResponse handle(final ProducerRecord<byte[], byte[]> record,
final Exception exception);
@Deprecated
default ProductionExceptionHandlerResponse handle(final ProducerRecord<byte[], byte[]> record,
final Exception exception) {
throw new UnsupportedOperationException();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KIP says NotImplementedException -- we should update the KIP -- same for the other method below.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KIP has been updated

}

/**
* Inspect a record that we attempted to produce, and the exception that resulted
* from attempting to produce it and determine whether or not to continue processing.
*
* @param context The error handler context metadata
* @param record The record that failed to produce
* @param exception The exception that occurred during production
*/
@SuppressWarnings("deprecation")
default ProductionExceptionHandlerResponse handle(final ErrorHandlerContext context,
final ProducerRecord<byte[], byte[]> record,
final Exception exception) {
return handle(record, exception);
}

/**
* Handles serialization exception and determine if the process should continue. The default implementation is to
* fail the process.
*
* @param record the record that failed to serialize
* @param exception the exception that occurred during serialization
* @deprecated Since 3.9. Use {@link #handle(ErrorHandlerContext, ProducerRecord, Exception)} instead.
*/
@Deprecated
default ProductionExceptionHandlerResponse handleSerializationException(final ProducerRecord record,
final Exception exception) {
return ProductionExceptionHandlerResponse.FAIL;
}

/**
* Handles serialization exception and determine if the process should continue. The default implementation is to
* fail the process.
*
* @param context the error handler context metadata
* @param record the record that failed to serialize
* @param exception the exception that occurred during serialization
* @param origin the origin of the serialization exception
*/
@SuppressWarnings("deprecation")
default ProductionExceptionHandlerResponse handleSerializationException(final ErrorHandlerContext context,
final ProducerRecord record,
final Exception exception,
final SerializationExceptionOrigin origin) {
return handleSerializationException(record, exception);
}

enum ProductionExceptionHandlerResponse {
/* continue processing */
CONTINUE(0, "CONTINUE"),
Expand All @@ -68,4 +106,11 @@ enum ProductionExceptionHandlerResponse {
this.name = name;
}
}

enum SerializationExceptionOrigin {
/* serialization exception occurred during serialization of the key */
KEY,
/* serialization exception occurred during serialization of the value */
VALUE
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,25 +28,19 @@ public class DefaultErrorHandlerContext implements ErrorHandlerContext {
private final int partition;
private final long offset;
private final Headers headers;
private final byte[] sourceRawKey;
private final byte[] sourceRawValue;
private final String processorNodeId;
private final TaskId taskId;

public DefaultErrorHandlerContext(final String topic,
final int partition,
final long offset,
final Headers headers,
final byte[] sourceRawKey,
final byte[] sourceRawValue,
final String processorNodeId,
final TaskId taskId) {
this.topic = topic;
this.partition = partition;
this.offset = offset;
this.headers = headers;
this.sourceRawKey = sourceRawKey;
this.sourceRawValue = sourceRawValue;
this.processorNodeId = processorNodeId;
this.taskId = taskId;
}
Expand All @@ -71,16 +65,6 @@ public Headers headers() {
return headers;
}

@Override
public byte[] sourceRawKey() {
return sourceRawKey;
}

@Override
public byte[] sourceRawValue() {
return sourceRawValue;
}

@Override
public String processorNodeId() {
return processorNodeId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
public class CorruptedRecord extends StampedRecord {

CorruptedRecord(final ConsumerRecord<byte[], byte[]> rawRecord) {
super(rawRecord, ConsumerRecord.NO_TIMESTAMP, rawRecord);
super(rawRecord, ConsumerRecord.NO_TIMESTAMP);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,8 +307,7 @@ private void reprocessState(final List<TopicPartition> topicPartitions,
record.offset(),
record.partition(),
record.topic(),
record.headers(),
record);
record.headers());
globalProcessorContext.setRecordContext(recordContext);

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,7 @@ public void update(final ConsumerRecord<byte[], byte[]> record) {
deserialized.offset(),
deserialized.partition(),
deserialized.topic(),
deserialized.headers(),
record);
deserialized.headers());
processorContext.setRecordContext(recordContext);
processorContext.setCurrentNode(sourceNodeAndDeserializer.sourceNode());
final Record<Object, Object> toProcess = new Record<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@ public void process(final Record<KIn, VIn> record) {
context.offset(),
context.partition(),
context.topic(),
record.headers(),
processorRecordContext.rawRecord()
record.headers()
));
delegate.process(record.key(), record.value());
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,7 @@ public <K, V> void forward(final Record<K, V> record, final String childName) {
recordContext.offset(),
recordContext.partition(),
recordContext.topic(),
record.headers(),
recordContext.rawRecord());
record.headers());
}

if (childName == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,6 @@ public void process(final Record<KIn, VIn> record) {
internalProcessorContext.partition(),
internalProcessorContext.offset(),
internalProcessorContext.headers(),
internalProcessorContext.recordContext().rawRecord().key(),
internalProcessorContext.recordContext().rawRecord().value(),
internalProcessorContext.currentNode().name(),
internalProcessorContext.taskId());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.processor.internals;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
Expand All @@ -38,28 +37,17 @@ public class ProcessorRecordContext implements RecordContext, RecordMetadata {
private final String topic;
private final int partition;
private final Headers headers;
private final ConsumerRecord<byte[], byte[]> rawRecord;

public ProcessorRecordContext(final long timestamp,
final long offset,
final int partition,
final String topic,
final Headers headers) {
this(timestamp, offset, partition, topic, headers, null);
}

public ProcessorRecordContext(final long timestamp,
final long offset,
final int partition,
final String topic,
final Headers headers,
final ConsumerRecord<byte[], byte[]> rawRecord) {
this.timestamp = timestamp;
this.offset = offset;
this.topic = topic;
this.partition = partition;
this.headers = Objects.requireNonNull(headers);
this.rawRecord = rawRecord;
}

@Override
Expand Down Expand Up @@ -87,10 +75,6 @@ public Headers headers() {
return headers;
}

public ConsumerRecord<byte[], byte[]> rawRecord() {
return rawRecord;
}

public long residentMemorySizeEstimate() {
long size = 0;
size += Long.BYTES; // value.context.timestamp
Expand Down Expand Up @@ -189,7 +173,7 @@ public static ProcessorRecordContext deserialize(final ByteBuffer buffer) {
headers = new RecordHeaders(headerArr);
}

return new ProcessorRecordContext(timestamp, offset, partition, topic, headers, null);
return new ProcessorRecordContext(timestamp, offset, partition, topic, headers);
}

@Override
Expand Down
Loading