Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public interface DeserializationExceptionHandler extends Configurable {
* @param context processor context
* @param record record that failed deserialization
* @param exception the actual exception
* @deprecated Since 3.9. Use Please {@link #handle(ErrorHandlerContext, ConsumerRecord, Exception)}
* @deprecated Since 3.9. Use {@link #handle(ErrorHandlerContext, ConsumerRecord, Exception)} instead.
*/
@Deprecated
default DeserializationHandlerResponse handle(final ProcessorContext context,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ default ProductionExceptionHandlerResponse handle(final ErrorHandlerContext cont
*
* @param record the record that failed to serialize
* @param exception the exception that occurred during serialization
* @deprecated Since 3.9. Use {@link #handle(ErrorHandlerContext, ProducerRecord, Exception)} instead.
* @deprecated Since 3.9. Use {@link #handleSerializationException(ErrorHandlerContext, ProducerRecord, Exception, SerializationExceptionOrigin)} instead.
*/
@Deprecated
default ProductionExceptionHandlerResponse handleSerializationException(final ProducerRecord record,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,18 @@ public TaskId taskId() {
return taskId;
}

@Override
public String toString() {
// we do exclude headers on purpose, to not accidentally log user data
return "ErrorHandlerContext{" +
"topic='" + topic + '\'' +
", partition=" + partition +
", offset=" + offset +
", processorNodeId='" + processorNodeId + '\'' +
", taskId=" + taskId +
'}';
}
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Omitting headers on purpose, as we should not log user-data.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know it's late in the game to ask this, but what about a timestamp? Just a thought, I don't have a strong opinion here.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. Sounds like a miss... \cc @sebastienviale @loicgreffier

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Side comment: I would not block this PR on this question, but do a follow up one, if we want to add ts.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep I agree it shouldn't block, adding a timestamp in a follow on PR is fine.


public Optional<ProcessorContext> processorContext() {
return Optional.ofNullable(processorContext);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@
public class FailedProcessingException extends StreamsException {
private static final long serialVersionUID = 1L;

public FailedProcessingException(final String errorMessage, final Exception exception) {
super(errorMessage, exception);
}

public FailedProcessingException(final Exception exception) {
super(exception);
// we need to explicitly set `message` to `null` here
super(null, exception);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ private void reprocessState(final List<TopicPartition> topicPartitions,
record.headers()));
restoreCount++;
}
} catch (final Exception deserializationException) {
} catch (final RuntimeException deserializationException) {
handleDeserializationFailure(
deserializationExceptionHandler,
globalProcessorContext,
Expand All @@ -330,7 +330,8 @@ private void reprocessState(final List<TopicPartition> topicPartitions,
Thread.currentThread().getName(),
globalProcessorContext.taskId().toString(),
globalProcessorContext.metrics()
)
),
null
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Side cleanup as discussed on some other PR.

);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

import static org.apache.kafka.streams.StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG;
Expand Down Expand Up @@ -202,7 +203,7 @@ public void process(final Record<KIn, VIn> record) {
} catch (final FailedProcessingException | TaskCorruptedException | TaskMigratedException e) {
// Rethrow exceptions that should not be handled here
throw e;
} catch (final RuntimeException e) {
} catch (final RuntimeException processingException) {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

side cleanup: use proper variable names

final ErrorHandlerContext errorHandlerContext = new DefaultErrorHandlerContext(
null, // only required to pass for DeserializationExceptionHandler
internalProcessorContext.topic(),
Expand All @@ -213,18 +214,26 @@ public void process(final Record<KIn, VIn> record) {
internalProcessorContext.taskId());

final ProcessingExceptionHandler.ProcessingHandlerResponse response;

try {
response = processingExceptionHandler.handle(errorHandlerContext, record, e);
} catch (final Exception fatalUserException) {
throw new FailedProcessingException(fatalUserException);
response = Objects.requireNonNull(
processingExceptionHandler.handle(errorHandlerContext, record, processingException),
"Invalid ProductionExceptionHandler response."
);
} catch (final RuntimeException fatalUserException) {
log.error(
"Processing error callback failed after processing error for record: {}",
errorHandlerContext,
processingException
);
throw new FailedProcessingException("Fatal user code error in processing error callback", fatalUserException);
}

if (response == ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL) {
log.error("Processing exception handler is set to fail upon" +
" a processing error. If you would rather have the streaming pipeline" +
" continue after a processing error, please set the " +
PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG + " appropriately.");
throw new FailedProcessingException(e);
throw new FailedProcessingException(processingException);
} else {
droppedRecordsSensor.record();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.errors.internals.DefaultErrorHandlerContext;
import org.apache.kafka.streams.errors.internals.FailedProcessingException;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
Expand All @@ -59,6 +60,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -208,7 +210,7 @@ public <K, V> void send(final String topic,
key,
keySerializer,
exception);
} catch (final Exception exception) {
} catch (final RuntimeException serializationException) {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

side cleanup: use proper variable names

handleException(
ProductionExceptionHandler.SerializationExceptionOrigin.KEY,
topic,
Expand All @@ -219,7 +221,7 @@ public <K, V> void send(final String topic,
timestamp,
processorNodeId,
context,
exception);
serializationException);
return;
}

Expand All @@ -232,7 +234,7 @@ public <K, V> void send(final String topic,
value,
valueSerializer,
exception);
} catch (final Exception exception) {
} catch (final RuntimeException serializationException) {
handleException(
ProductionExceptionHandler.SerializationExceptionOrigin.VALUE,
topic,
Expand All @@ -243,7 +245,7 @@ public <K, V> void send(final String topic,
timestamp,
processorNodeId,
context,
exception);
serializationException);
return;
}

Expand Down Expand Up @@ -297,42 +299,51 @@ private <K, V> void handleException(final ProductionExceptionHandler.Serializati
final Long timestamp,
final String processorNodeId,
final InternalProcessorContext<Void, Void> context,
final Exception exception) {
final RuntimeException serializationException) {
log.debug(String.format("Error serializing record for topic %s", topic), serializationException);

final DefaultErrorHandlerContext errorHandlerContext = new DefaultErrorHandlerContext(
null, // only required to pass for DeserializationExceptionHandler
context.recordContext().topic(),
context.recordContext().partition(),
context.recordContext().offset(),
context.recordContext().headers(),
processorNodeId,
taskId
);
final ProducerRecord<K, V> record = new ProducerRecord<>(topic, partition, timestamp, key, value, headers);
final ProductionExceptionHandlerResponse response;

log.debug(String.format("Error serializing record to topic %s", topic), exception);

final ProductionExceptionHandlerResponse response;
try {
final DefaultErrorHandlerContext errorHandlerContext = new DefaultErrorHandlerContext(
null, // only required to pass for DeserializationExceptionHandler
context.recordContext().topic(),
context.recordContext().partition(),
context.recordContext().offset(),
context.recordContext().headers(),
processorNodeId,
taskId
response = Objects.requireNonNull(
productionExceptionHandler.handleSerializationException(errorHandlerContext, record, serializationException, origin),
"Invalid ProductionExceptionHandler response."
);
response = productionExceptionHandler.handleSerializationException(errorHandlerContext, record, exception, origin);
} catch (final Exception e) {
log.error("Fatal when handling serialization exception", e);
recordSendError(topic, e, null, context, processorNodeId);
return;
} catch (final RuntimeException fatalUserException) {
log.error(
String.format(
"Production error callback failed after serialization error for record %s: %s",
origin.toString().toLowerCase(Locale.ROOT),
errorHandlerContext
),
serializationException
);
throw new FailedProcessingException("Fatal user code error in production error callback", fatalUserException);
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix: inside send() is seem incorrect to call recordSendError() which we only use in from the producer.send(..., Calback) which is called async. Here inside RecordCollector.send() (before we hit producer.send()) can should rather throw directly

}

if (response == ProductionExceptionHandlerResponse.FAIL) {
throw new StreamsException(
String.format(
"Unable to serialize record. ProducerRecord(topic=[%s], partition=[%d], timestamp=[%d]",
topic,
partition,
timestamp),
exception
topic,
partition,
timestamp),
serializationException
);
}

log.warn("Unable to serialize record, continue processing. " +
"ProducerRecord(topic=[{}], partition=[{}], timestamp=[{}])",
"ProducerRecord(topic=[{}], partition=[{}], timestamp=[{}])",
topic,
partition,
timestamp);
Expand Down Expand Up @@ -364,24 +375,24 @@ private <KV> StreamsException createStreamsExceptionForClassCastException(final
}

private void recordSendError(final String topic,
final Exception exception,
final Exception productionException,
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

side cleanup: use proper variable names

final ProducerRecord<byte[], byte[]> serializedRecord,
final InternalProcessorContext<Void, Void> context,
final String processorNodeId) {
String errorMessage = String.format(SEND_EXCEPTION_MESSAGE, topic, taskId, exception.toString());
String errorMessage = String.format(SEND_EXCEPTION_MESSAGE, topic, taskId, productionException.toString());

if (isFatalException(exception)) {
if (isFatalException(productionException)) {
errorMessage += "\nWritten offsets would not be recorded and no more records would be sent since this is a fatal error.";
sendException.set(new StreamsException(errorMessage, exception));
} else if (exception instanceof ProducerFencedException ||
exception instanceof InvalidPidMappingException ||
exception instanceof InvalidProducerEpochException ||
exception instanceof OutOfOrderSequenceException) {
sendException.set(new StreamsException(errorMessage, productionException));
} else if (productionException instanceof ProducerFencedException ||
productionException instanceof InvalidPidMappingException ||
productionException instanceof InvalidProducerEpochException ||
productionException instanceof OutOfOrderSequenceException) {
errorMessage += "\nWritten offsets would not be recorded and no more records would be sent since the producer is fenced, " +
"indicating the task may be migrated out";
sendException.set(new TaskMigratedException(errorMessage, exception));
sendException.set(new TaskMigratedException(errorMessage, productionException));
} else {
if (isRetriable(exception)) {
if (isRetriable(productionException)) {
errorMessage += "\nThe broker is either slow or in bad state (like not having enough replicas) in responding the request, " +
"or the connection to broker was interrupted sending the request or receiving the response. " +
"\nConsider overwriting `max.block.ms` and /or " +
Expand All @@ -398,17 +409,34 @@ private void recordSendError(final String topic,
taskId
);

if (productionExceptionHandler.handle(errorHandlerContext, serializedRecord, exception) == ProductionExceptionHandlerResponse.FAIL) {
final ProductionExceptionHandlerResponse response;
try {
response = Objects.requireNonNull(
productionExceptionHandler.handle(errorHandlerContext, serializedRecord, productionException),
"Invalid ProductionExceptionHandler response."
);
} catch (final RuntimeException fatalUserException) {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new: handle exception from ProductionExceptionHandler callback

log.error(
"Production error callback failed after production error for record {}",
serializedRecord,
productionException
);
sendException.set(new FailedProcessingException("Fatal user code error in production error callback", fatalUserException));
return;
}

if (response == ProductionExceptionHandlerResponse.FAIL) {
errorMessage += "\nException handler choose to FAIL the processing, no more records would be sent.";
sendException.set(new StreamsException(errorMessage, exception));
sendException.set(new StreamsException(errorMessage, productionException));
} else {
errorMessage += "\nException handler choose to CONTINUE processing in spite of this error but written offsets would not be recorded.";
droppedRecordsSensor.record();
}

}
}

log.error(errorMessage, exception);
log.error(errorMessage, productionException);
}

/**
Expand Down
Loading