From e08da1a96f4cb9dca3c0c5846f3d0049e8cdfc28 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Wed, 31 Jul 2024 00:28:10 -0700 Subject: [PATCH 1/5] KAFKA-16448: Unify error-callback exception handling --- .../errors/ProductionExceptionHandler.java | 2 +- .../internals/DefaultErrorHandlerContext.java | 12 + .../internals/FailedProcessingException.java | 7 +- .../internals/GlobalStateManagerImpl.java | 5 +- .../processor/internals/ProcessorNode.java | 21 +- .../internals/RecordCollectorImpl.java | 98 +++++--- .../internals/RecordDeserializer.java | 50 ++--- .../processor/internals/StreamTask.java | 47 ++-- ...essingExceptionHandlerIntegrationTest.java | 30 +-- .../internals/RecordCollectorTest.java | 182 +++++++++++---- .../internals/RecordDeserializerTest.java | 210 ++++++++++++------ .../processor/internals/StreamTaskTest.java | 140 ++++++++---- 12 files changed, 563 insertions(+), 241 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java b/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java index 25aa00f7a9279..939b1ecbcd682 100644 --- a/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java +++ b/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java @@ -59,7 +59,7 @@ default ProductionExceptionHandlerResponse handle(final ErrorHandlerContext cont * * @param record the record that failed to serialize * @param exception the exception that occurred during serialization - * @deprecated Since 3.9. Use {@link #handle(ErrorHandlerContext, ProducerRecord, Exception)} instead. + * @deprecated Since 3.9. Use {@link #handleSerializationException(ErrorHandlerContext, ProducerRecord, Exception, SerializationExceptionOrigin)} instead. */ @Deprecated default ProductionExceptionHandlerResponse handleSerializationException(final ProducerRecord record, diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/internals/DefaultErrorHandlerContext.java b/streams/src/main/java/org/apache/kafka/streams/errors/internals/DefaultErrorHandlerContext.java index c907ff3eb89ae..77500ce3c3607 100644 --- a/streams/src/main/java/org/apache/kafka/streams/errors/internals/DefaultErrorHandlerContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/errors/internals/DefaultErrorHandlerContext.java @@ -81,6 +81,18 @@ public TaskId taskId() { return taskId; } + @Override + public String toString() { + // we do exclude headers on purpose, to not accidentally log user data + return "ErrorHandlerContext{" + + "topic='" + topic + '\'' + + ", partition=" + partition + + ", offset=" + offset + + ", processorNodeId='" + processorNodeId + '\'' + + ", taskId=" + taskId + + '}'; + } + public Optional processorContext() { return Optional.ofNullable(processorContext); } diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/internals/FailedProcessingException.java b/streams/src/main/java/org/apache/kafka/streams/errors/internals/FailedProcessingException.java index 25f2ae9f6cc09..03d687d2687ab 100644 --- a/streams/src/main/java/org/apache/kafka/streams/errors/internals/FailedProcessingException.java +++ b/streams/src/main/java/org/apache/kafka/streams/errors/internals/FailedProcessingException.java @@ -25,7 +25,12 @@ public class FailedProcessingException extends StreamsException { private static final long serialVersionUID = 1L; + public FailedProcessingException(final String errorMessage, final Exception exception) { + super(errorMessage, exception); + } + public FailedProcessingException(final Exception exception) { - super(exception); + // we need to explicitly set `message` to `null` here + super(null, exception); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java index 6b7214a9ed185..12d4c6c603dcc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java @@ -319,7 +319,7 @@ private void reprocessState(final List topicPartitions, record.headers())); restoreCount++; } - } catch (final Exception deserializationException) { + } catch (final RuntimeException deserializationException) { handleDeserializationFailure( deserializationExceptionHandler, globalProcessorContext, @@ -330,7 +330,8 @@ private void reprocessState(final List topicPartitions, Thread.currentThread().getName(), globalProcessorContext.taskId().toString(), globalProcessorContext.metrics() - ) + ), + null ); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java index 3ccfcf24905b0..07cce0730ee90 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java @@ -38,6 +38,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import static org.apache.kafka.streams.StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG; @@ -202,7 +203,7 @@ public void process(final Record record) { } catch (final FailedProcessingException | TaskCorruptedException | TaskMigratedException e) { // Rethrow exceptions that should not be handled here throw e; - } catch (final RuntimeException e) { + } catch (final RuntimeException processingException) { final ErrorHandlerContext errorHandlerContext = new DefaultErrorHandlerContext( null, // only required to pass for DeserializationExceptionHandler internalProcessorContext.topic(), @@ -213,18 +214,26 @@ public void process(final Record record) { internalProcessorContext.taskId()); final ProcessingExceptionHandler.ProcessingHandlerResponse response; - try { - response = processingExceptionHandler.handle(errorHandlerContext, record, e); - } catch (final Exception fatalUserException) { - throw new FailedProcessingException(fatalUserException); + response = Objects.requireNonNull( + processingExceptionHandler.handle(errorHandlerContext, record, processingException), + "Invalid ProductionExceptionHandler response." + ); + } catch (final RuntimeException fatalUserException) { + log.error( + "Processing error callback failed after processing error for record: {}", + errorHandlerContext, + processingException + ); + throw new FailedProcessingException("Fatal user code error in processing error callback", fatalUserException); } + if (response == ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL) { log.error("Processing exception handler is set to fail upon" + " a processing error. If you would rather have the streaming pipeline" + " continue after a processing error, please set the " + PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG + " appropriately."); - throw new FailedProcessingException(e); + throw new FailedProcessingException(processingException); } else { droppedRecordsSensor.record(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java index a4dc0a68062ca..86e1ebc1be9ee 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java @@ -45,6 +45,7 @@ import org.apache.kafka.streams.errors.TaskCorruptedException; import org.apache.kafka.streams.errors.TaskMigratedException; import org.apache.kafka.streams.errors.internals.DefaultErrorHandlerContext; +import org.apache.kafka.streams.errors.internals.FailedProcessingException; import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; @@ -59,6 +60,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; @@ -208,7 +210,7 @@ public void send(final String topic, key, keySerializer, exception); - } catch (final Exception exception) { + } catch (final RuntimeException serializationException) { handleException( ProductionExceptionHandler.SerializationExceptionOrigin.KEY, topic, @@ -219,7 +221,7 @@ public void send(final String topic, timestamp, processorNodeId, context, - exception); + serializationException); return; } @@ -232,7 +234,7 @@ public void send(final String topic, value, valueSerializer, exception); - } catch (final Exception exception) { + } catch (final RuntimeException serializationException) { handleException( ProductionExceptionHandler.SerializationExceptionOrigin.VALUE, topic, @@ -243,7 +245,7 @@ public void send(final String topic, timestamp, processorNodeId, context, - exception); + serializationException); return; } @@ -297,27 +299,36 @@ private void handleException(final ProductionExceptionHandler.Serializati final Long timestamp, final String processorNodeId, final InternalProcessorContext context, - final Exception exception) { + final RuntimeException serializationException) { + log.debug(String.format("Error serializing record for topic %s", topic), serializationException); + + final DefaultErrorHandlerContext errorHandlerContext = new DefaultErrorHandlerContext( + null, // only required to pass for DeserializationExceptionHandler + context.recordContext().topic(), + context.recordContext().partition(), + context.recordContext().offset(), + context.recordContext().headers(), + processorNodeId, + taskId + ); final ProducerRecord record = new ProducerRecord<>(topic, partition, timestamp, key, value, headers); - final ProductionExceptionHandlerResponse response; - - log.debug(String.format("Error serializing record to topic %s", topic), exception); + final ProductionExceptionHandlerResponse response; try { - final DefaultErrorHandlerContext errorHandlerContext = new DefaultErrorHandlerContext( - null, // only required to pass for DeserializationExceptionHandler - context.recordContext().topic(), - context.recordContext().partition(), - context.recordContext().offset(), - context.recordContext().headers(), - processorNodeId, - taskId + response = Objects.requireNonNull( + productionExceptionHandler.handleSerializationException(errorHandlerContext, record, serializationException, origin), + "Invalid ProductionExceptionHandler response." ); - response = productionExceptionHandler.handleSerializationException(errorHandlerContext, record, exception, origin); - } catch (final Exception e) { - log.error("Fatal when handling serialization exception", e); - recordSendError(topic, e, null, context, processorNodeId); - return; + } catch (final RuntimeException fatalUserException) { + log.error( + String.format( + "Production error callback failed after serialization error for record %s: %s", + origin.toString().toLowerCase(Locale.ROOT), + errorHandlerContext + ), + serializationException + ); + throw new FailedProcessingException("Fatal user code error in production error callback", fatalUserException); } if (response == ProductionExceptionHandlerResponse.FAIL) { @@ -327,7 +338,7 @@ private void handleException(final ProductionExceptionHandler.Serializati topic, partition, timestamp), - exception + serializationException ); } @@ -364,24 +375,24 @@ private StreamsException createStreamsExceptionForClassCastException(final } private void recordSendError(final String topic, - final Exception exception, + final Exception productionException, final ProducerRecord serializedRecord, final InternalProcessorContext context, final String processorNodeId) { - String errorMessage = String.format(SEND_EXCEPTION_MESSAGE, topic, taskId, exception.toString()); + String errorMessage = String.format(SEND_EXCEPTION_MESSAGE, topic, taskId, productionException.toString()); - if (isFatalException(exception)) { + if (isFatalException(productionException)) { errorMessage += "\nWritten offsets would not be recorded and no more records would be sent since this is a fatal error."; - sendException.set(new StreamsException(errorMessage, exception)); - } else if (exception instanceof ProducerFencedException || - exception instanceof InvalidPidMappingException || - exception instanceof InvalidProducerEpochException || - exception instanceof OutOfOrderSequenceException) { + sendException.set(new StreamsException(errorMessage, productionException)); + } else if (productionException instanceof ProducerFencedException || + productionException instanceof InvalidPidMappingException || + productionException instanceof InvalidProducerEpochException || + productionException instanceof OutOfOrderSequenceException) { errorMessage += "\nWritten offsets would not be recorded and no more records would be sent since the producer is fenced, " + "indicating the task may be migrated out"; - sendException.set(new TaskMigratedException(errorMessage, exception)); + sendException.set(new TaskMigratedException(errorMessage, productionException)); } else { - if (isRetriable(exception)) { + if (isRetriable(productionException)) { errorMessage += "\nThe broker is either slow or in bad state (like not having enough replicas) in responding the request, " + "or the connection to broker was interrupted sending the request or receiving the response. " + "\nConsider overwriting `max.block.ms` and /or " + @@ -398,17 +409,34 @@ private void recordSendError(final String topic, taskId ); - if (productionExceptionHandler.handle(errorHandlerContext, serializedRecord, exception) == ProductionExceptionHandlerResponse.FAIL) { + final ProductionExceptionHandlerResponse response; + try { + response = Objects.requireNonNull( + productionExceptionHandler.handle(errorHandlerContext, serializedRecord, productionException), + "Invalid ProductionExceptionHandler response." + ); + } catch (final RuntimeException fatalUserException) { + log.error( + "Production error callback failed after production error for record {}", + serializedRecord, + productionException + ); + sendException.set(new FailedProcessingException("Fatal user code error in production error callback", fatalUserException)); + return; + } + + if (response == ProductionExceptionHandlerResponse.FAIL) { errorMessage += "\nException handler choose to FAIL the processing, no more records would be sent."; - sendException.set(new StreamsException(errorMessage, exception)); + sendException.set(new StreamsException(errorMessage, productionException)); } else { errorMessage += "\nException handler choose to CONTINUE processing in spite of this error but written offsets would not be recorded."; droppedRecordsSensor.record(); } + } } - log.error(errorMessage, exception); + log.error(errorMessage, productionException); } /** diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java index 8ee2dc014ebc6..5fc03352ecc53 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java @@ -21,12 +21,14 @@ import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.errors.DeserializationExceptionHandler; +import org.apache.kafka.streams.errors.DeserializationExceptionHandler.DeserializationHandlerResponse; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.internals.DefaultErrorHandlerContext; import org.apache.kafka.streams.processor.api.ProcessorContext; import org.slf4j.Logger; +import java.util.Objects; import java.util.Optional; import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG; @@ -49,7 +51,7 @@ public class RecordDeserializer { /** * @throws StreamsException if a deserialization error occurs and the deserialization callback returns - * {@link DeserializationExceptionHandler.DeserializationHandlerResponse#FAIL FAIL} + * {@link DeserializationHandlerResponse#FAIL FAIL} * or throws an exception itself */ ConsumerRecord deserialize(final ProcessorContext processorContext, @@ -69,7 +71,7 @@ ConsumerRecord deserialize(final ProcessorContext processo rawRecord.headers(), Optional.empty() ); - } catch (final Exception deserializationException) { + } catch (final RuntimeException deserializationException) { handleDeserializationFailure(deserializationExceptionHandler, processorContext, deserializationException, rawRecord, log, droppedRecordsSensor, sourceNode().name()); return null; // 'handleDeserializationFailure' would either throw or swallow -- if we swallow we need to skip the record by returning 'null' } @@ -77,39 +79,37 @@ ConsumerRecord deserialize(final ProcessorContext processo public static void handleDeserializationFailure(final DeserializationExceptionHandler deserializationExceptionHandler, final ProcessorContext processorContext, - final Exception deserializationException, - final ConsumerRecord rawRecord, - final Logger log, - final Sensor droppedRecordsSensor) { - handleDeserializationFailure(deserializationExceptionHandler, processorContext, deserializationException, rawRecord, log, droppedRecordsSensor, null); - } - - public static void handleDeserializationFailure(final DeserializationExceptionHandler deserializationExceptionHandler, - final ProcessorContext processorContext, - final Exception deserializationException, + final RuntimeException deserializationException, final ConsumerRecord rawRecord, final Logger log, final Sensor droppedRecordsSensor, final String sourceNodeName) { - final DeserializationExceptionHandler.DeserializationHandlerResponse response; + + final DefaultErrorHandlerContext errorHandlerContext = new DefaultErrorHandlerContext( + (InternalProcessorContext) processorContext, + rawRecord.topic(), + rawRecord.partition(), + rawRecord.offset(), + rawRecord.headers(), + sourceNodeName, + processorContext.taskId()); + + final DeserializationHandlerResponse response; try { - final DefaultErrorHandlerContext errorHandlerContext = new DefaultErrorHandlerContext( - (InternalProcessorContext) processorContext, - rawRecord.topic(), - rawRecord.partition(), - rawRecord.offset(), - rawRecord.headers(), - sourceNodeName, - processorContext.taskId()); - response = deserializationExceptionHandler.handle(errorHandlerContext, rawRecord, deserializationException); - } catch (final Exception fatalUserException) { + response = Objects.requireNonNull( + deserializationExceptionHandler.handle(errorHandlerContext, rawRecord, deserializationException), + "Invalid DeserializationExceptionHandler response." + ); + } catch (final RuntimeException fatalUserException) { log.error( "Deserialization error callback failed after deserialization error for record {}", rawRecord, - deserializationException); + deserializationException + ); throw new StreamsException("Fatal user code error in deserialization error callback", fatalUserException); } - if (response == DeserializationExceptionHandler.DeserializationHandlerResponse.FAIL) { + + if (response == DeserializationHandlerResponse.FAIL) { throw new StreamsException("Deserialization exception handler is set to fail upon" + " a deserialization error. If you would rather have the streaming pipeline" + " continue after a deserialization error, please set the " + diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 6f2edd442b049..c3a5c59c7753b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -59,6 +59,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.function.Function; @@ -807,7 +808,7 @@ record = null; } } catch (final FailedProcessingException failedProcessingException) { // Do not keep the failed processing exception in the stack trace - handleException(failedProcessingException.getCause()); + handleException(failedProcessingException.getMessage(), failedProcessingException.getCause()); } catch (final StreamsException exception) { record = null; throw exception; @@ -820,19 +821,25 @@ record = null; return true; } - private void handleException(final Throwable e) { - final StreamsException error = new StreamsException( + private void handleException(final Throwable originalException) { + handleException( String.format( - "Exception caught in process. taskId=%s, processor=%s, topic=%s, partition=%d, offset=%d, stacktrace=%s", + "Exception caught in process. taskId=%s, processor=%s, topic=%s, partition=%d, offset=%d", id(), processorContext.currentNode().name(), record.topic(), record.partition(), - record.offset(), - getStacktraceString(e) + record.offset() ), - e - ); + originalException); + } + + private void handleException(final String errorMessage, final Throwable originalException) { + if (errorMessage == null) { + handleException(originalException); + } + + final StreamsException error = new StreamsException(errorMessage, originalException); record = null; throw error; @@ -920,11 +927,18 @@ public void punctuate(final ProcessorNode node, try { maybeMeasureLatency(() -> punctuator.punctuate(timestamp), time, punctuateLatencySensor); + } catch (final TimeoutException timeoutException) { + if (!eosEnabled) { + throw timeoutException; + } else { + record = null; + throw new TaskCorruptedException(Collections.singleton(id)); + } } catch (final FailedProcessingException e) { throw createStreamsException(node.name(), e.getCause()); } catch (final TaskCorruptedException | TaskMigratedException e) { throw e; - } catch (final Exception e) { + } catch (final RuntimeException processingException) { final ErrorHandlerContext errorHandlerContext = new DefaultErrorHandlerContext( null, recordContext.topic(), @@ -936,11 +950,18 @@ public void punctuate(final ProcessorNode node, ); final ProcessingExceptionHandler.ProcessingHandlerResponse response; - try { - response = processingExceptionHandler.handle(errorHandlerContext, null, e); + response = Objects.requireNonNull( + processingExceptionHandler.handle(errorHandlerContext, null, processingException), + "Invalid ProcessingExceptionHandler response." + ); } catch (final Exception fatalUserException) { - throw new FailedProcessingException(fatalUserException); + log.error( + "Processing error callback failed after processing error for record: {}", + errorHandlerContext, + processingException + ); + throw new FailedProcessingException("Fatal user code error in processing error callback", fatalUserException); } if (response == ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL) { @@ -949,7 +970,7 @@ public void punctuate(final ProcessorNode node, " continue after a processing error, please set the " + PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG + " appropriately."); - throw createStreamsException(node.name(), e); + throw createStreamsException(node.name(), processingException); } else { droppedRecordsSensor.record(); } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java index 61b5ed16bb1ef..f3c4471e64522 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java @@ -51,6 +51,7 @@ import static org.apache.kafka.common.utils.Utils.mkMap; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertIterableEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -61,7 +62,7 @@ public class ProcessingExceptionHandlerIntegrationTest { private final String threadId = Thread.currentThread().getName(); @Test - public void shouldFailWhenProcessingExceptionOccurs() { + public void shouldFailWhenProcessingExceptionOccursIfExceptionHanlderReturnFail() { final List> events = Arrays.asList( new KeyValue<>("ID123-1", "ID123-A1"), new KeyValue<>("ID123-2-ERR", "ID123-A2"), @@ -93,8 +94,7 @@ public void shouldFailWhenProcessingExceptionOccurs() { assertTrue(exception.getMessage().contains("Exception caught in process. " + "taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=TOPIC_NAME, " - + "partition=0, offset=1, stacktrace=java.lang.RuntimeException: " - + "Exception should be handled by processing exception handler")); + + "partition=0, offset=1")); assertEquals(1, processor.theCapturedProcessor().processed().size()); assertIterableEquals(expectedProcessedRecords, processor.theCapturedProcessor().processed()); @@ -107,7 +107,7 @@ public void shouldFailWhenProcessingExceptionOccurs() { } @Test - public void shouldContinueWhenProcessingExceptionOccurs() { + public void shouldContinueWhenProcessingExceptionOccursIfExceptionHandlerReturnContinue() { final List> events = Arrays.asList( new KeyValue<>("ID123-1", "ID123-A1"), new KeyValue<>("ID123-2-ERR", "ID123-A2"), @@ -182,8 +182,7 @@ public void shouldStopOnFailedProcessorWhenProcessingExceptionOccursInFailProces final StreamsException e = assertThrows(StreamsException.class, () -> inputTopic.pipeInput(eventError.key, eventError.value, Instant.EPOCH)); assertTrue(e.getMessage().contains("Exception caught in process. " + "taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=TOPIC_NAME, " - + "partition=0, offset=1, stacktrace=java.lang.RuntimeException: " - + "Exception should be handled by processing exception handler")); + + "partition=0, offset=1")); assertFalse(isExecuted.get()); } } @@ -222,9 +221,9 @@ public void shouldStopOnFailedProcessorWhenProcessingExceptionOccursInContinuePr } @Test - public void shouldStopProcessingWhenFatalUserExceptionInFailProcessingExceptionHandler() { + public void shouldStopProcessingWhenProcessingExceptionHandlerReturnsNull() { final KeyValue event = new KeyValue<>("ID123-1", "ID123-A1"); - final KeyValue eventError = new KeyValue<>("ID123-ERR-FATAL", "ID123-A2"); + final KeyValue eventError = new KeyValue<>("ID123-ERR-NULL", "ID123-A2"); final MockProcessorSupplier processor = new MockProcessorSupplier<>(); final StreamsBuilder builder = new StreamsBuilder(); @@ -241,7 +240,7 @@ public void shouldStopProcessingWhenFatalUserExceptionInFailProcessingExceptionH .process(processor); final Properties properties = new Properties(); - properties.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, FailProcessingExceptionHandlerMockTest.class); + properties.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, ContinueProcessingExceptionHandlerMockTest.class); try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), properties, Instant.ofEpochMilli(0L))) { final TestInputTopic inputTopic = driver.createInputTopic("TOPIC_NAME", new StringSerializer(), new StringSerializer()); @@ -250,13 +249,15 @@ public void shouldStopProcessingWhenFatalUserExceptionInFailProcessingExceptionH assertTrue(isExecuted.get()); isExecuted.set(false); final StreamsException e = assertThrows(StreamsException.class, () -> inputTopic.pipeInput(eventError.key, eventError.value, Instant.EPOCH)); - assertEquals("KABOOM!", e.getCause().getMessage()); + assertEquals("Fatal user code error in processing error callback", e.getMessage()); + assertInstanceOf(NullPointerException.class, e.getCause()); + assertEquals("Invalid ProductionExceptionHandler response.", e.getCause().getMessage()); assertFalse(isExecuted.get()); } } @Test - public void shouldStopProcessingWhenFatalUserExceptionInContinueProcessingExceptionHandler() { + public void shouldStopProcessingWhenFatalUserExceptionProcessingExceptionHandler() { final KeyValue event = new KeyValue<>("ID123-1", "ID123-A1"); final KeyValue eventError = new KeyValue<>("ID123-ERR-FATAL", "ID123-A2"); @@ -284,6 +285,7 @@ public void shouldStopProcessingWhenFatalUserExceptionInContinueProcessingExcept assertTrue(isExecuted.get()); isExecuted.set(false); final StreamsException e = assertThrows(StreamsException.class, () -> inputTopic.pipeInput(eventError.key, eventError.value, Instant.EPOCH)); + assertEquals("Fatal user code error in processing error callback", e.getMessage()); assertEquals("KABOOM!", e.getCause().getMessage()); assertFalse(isExecuted.get()); } @@ -295,6 +297,9 @@ public ProcessingExceptionHandler.ProcessingHandlerResponse handle(final ErrorHa if (((String) record.key()).contains("FATAL")) { throw new RuntimeException("KABOOM!"); } + if (((String) record.key()).contains("NULL")) { + return null; + } assertProcessingExceptionHandlerInputs(context, record, exception); return ProcessingExceptionHandler.ProcessingHandlerResponse.CONTINUE; } @@ -308,9 +313,6 @@ public void configure(final Map configs) { public static class FailProcessingExceptionHandlerMockTest implements ProcessingExceptionHandler { @Override public ProcessingExceptionHandler.ProcessingHandlerResponse handle(final ErrorHandlerContext context, final Record record, final Exception exception) { - if (((String) record.key()).contains("FATAL")) { - throw new RuntimeException("KABOOM!"); - } assertProcessingExceptionHandlerInputs(context, record, exception); return ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL; } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java index 2623102885197..36d4434f344e6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java @@ -51,6 +51,8 @@ import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler; import org.apache.kafka.streams.errors.ErrorHandlerContext; import org.apache.kafka.streams.errors.ProductionExceptionHandler; +import org.apache.kafka.streams.errors.ProductionExceptionHandler.ProductionExceptionHandlerResponse; +import org.apache.kafka.streams.errors.ProductionExceptionHandler.SerializationExceptionOrigin; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.TaskMigratedException; import org.apache.kafka.streams.processor.StreamPartitioner; @@ -1242,7 +1244,7 @@ public void shouldThrowStreamsExceptionOnSubsequentSendIfFatalEvenWithContinueEx logContext, taskId, getExceptionalStreamsProducerOnSend(exception), - new ProductionExceptionHandlerMock(ProductionExceptionHandler.ProductionExceptionHandlerResponse.CONTINUE), + new ProductionExceptionHandlerMock(Optional.of(ProductionExceptionHandlerResponse.CONTINUE)), streamsMetrics, topology ); @@ -1269,7 +1271,7 @@ public void shouldThrowStreamsExceptionOnSubsequentFlushIfFatalEvenWithContinueE logContext, taskId, getExceptionalStreamsProducerOnSend(exception), - new ProductionExceptionHandlerMock(ProductionExceptionHandler.ProductionExceptionHandlerResponse.CONTINUE), + new ProductionExceptionHandlerMock(Optional.of(ProductionExceptionHandlerResponse.CONTINUE)), streamsMetrics, topology ); @@ -1293,7 +1295,7 @@ public void shouldThrowStreamsExceptionOnSubsequentCloseIfFatalEvenWithContinueE logContext, taskId, getExceptionalStreamsProducerOnSend(exception), - new ProductionExceptionHandlerMock(ProductionExceptionHandler.ProductionExceptionHandlerResponse.CONTINUE), + new ProductionExceptionHandlerMock(Optional.of(ProductionExceptionHandlerResponse.CONTINUE)), streamsMetrics, topology ); @@ -1317,7 +1319,7 @@ public void shouldNotThrowStreamsExceptionOnSubsequentCallIfASendFailsWithContin taskId, getExceptionalStreamsProducerOnSend(new RuntimeException("KABOOM!")), new ProductionExceptionHandlerMock( - ProductionExceptionHandler.ProductionExceptionHandlerResponse.CONTINUE, + Optional.of(ProductionExceptionHandlerResponse.CONTINUE), context, sinkNodeName, taskId @@ -1363,12 +1365,12 @@ public void shouldNotThrowStreamsExceptionOnSubsequentCallIfASendFailsWithContin public void shouldThrowStreamsExceptionOnUnknownTopicOrPartitionExceptionWithDefaultExceptionHandler() { final KafkaException exception = new TimeoutException("KABOOM!", new UnknownTopicOrPartitionException()); final RecordCollector collector = new RecordCollectorImpl( - logContext, - taskId, - getExceptionalStreamsProducerOnSend(exception), - productionExceptionHandler, - streamsMetrics, - topology + logContext, + taskId, + getExceptionalStreamsProducerOnSend(exception), + productionExceptionHandler, + streamsMetrics, + topology ); collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, sinkNodeName, context, streamPartitioner); @@ -1378,10 +1380,10 @@ public void shouldThrowStreamsExceptionOnUnknownTopicOrPartitionExceptionWithDef final StreamsException thrown = assertThrows(StreamsException.class, collector::flush); assertEquals(exception, thrown.getCause()); assertThat( - thrown.getMessage(), - equalTo("Error encountered sending record to topic topic for task 0_0 due to:" + - "\norg.apache.kafka.common.errors.TimeoutException: KABOOM!" + - "\nException handler choose to FAIL the processing, no more records would be sent.") + thrown.getMessage(), + equalTo("Error encountered sending record to topic topic for task 0_0 due to:" + + "\norg.apache.kafka.common.errors.TimeoutException: KABOOM!" + + "\nException handler choose to FAIL the processing, no more records would be sent.") ); } @@ -1389,17 +1391,17 @@ public void shouldThrowStreamsExceptionOnUnknownTopicOrPartitionExceptionWithDef public void shouldNotThrowTaskCorruptedExceptionOnUnknownTopicOrPartitionExceptionUsingAlwaysContinueExceptionHandler() { final KafkaException exception = new TimeoutException("KABOOM!", new UnknownTopicOrPartitionException()); final RecordCollector collector = new RecordCollectorImpl( - logContext, - taskId, - getExceptionalStreamsProducerOnSend(exception), - new ProductionExceptionHandlerMock( - ProductionExceptionHandler.ProductionExceptionHandlerResponse.CONTINUE, - context, - sinkNodeName, - taskId - ), - streamsMetrics, - topology + logContext, + taskId, + getExceptionalStreamsProducerOnSend(exception), + new ProductionExceptionHandlerMock( + Optional.of(ProductionExceptionHandlerResponse.CONTINUE), + context, + sinkNodeName, + taskId + ), + streamsMetrics, + topology ); collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, sinkNodeName, context, streamPartitioner); @@ -1539,11 +1541,11 @@ public void shouldThrowStreamsExceptionUsingDefaultExceptionHandler() { public void shouldDropRecordExceptionUsingAlwaysContinueExceptionHandler() { try (final ErrorStringSerializer errorSerializer = new ErrorStringSerializer()) { final RecordCollector collector = newRecordCollector(new ProductionExceptionHandlerMock( - ProductionExceptionHandler.ProductionExceptionHandlerResponse.CONTINUE, + Optional.of(ProductionExceptionHandlerResponse.CONTINUE), context, sinkNodeName, taskId, - ProductionExceptionHandler.SerializationExceptionOrigin.KEY + SerializationExceptionOrigin.KEY )); collector.initialize(); @@ -1568,11 +1570,11 @@ public void shouldDropRecordExceptionUsingAlwaysContinueExceptionHandler() { public void shouldThrowStreamsExceptionWhenValueSerializationFailedAndProductionExceptionHandlerRepliesWithFail() { try (final ErrorStringSerializer errorSerializer = new ErrorStringSerializer()) { final RecordCollector collector = newRecordCollector(new ProductionExceptionHandlerMock( - ProductionExceptionHandler.ProductionExceptionHandlerResponse.FAIL, + Optional.of(ProductionExceptionHandlerResponse.FAIL), context, sinkNodeName, taskId, - ProductionExceptionHandler.SerializationExceptionOrigin.VALUE + SerializationExceptionOrigin.VALUE )); collector.initialize(); @@ -1589,11 +1591,11 @@ public void shouldThrowStreamsExceptionWhenValueSerializationFailedAndProduction public void shouldThrowStreamsExceptionWhenKeySerializationFailedAndProductionExceptionHandlerRepliesWithFail() { try (final ErrorStringSerializer errorSerializer = new ErrorStringSerializer()) { final RecordCollector collector = newRecordCollector(new ProductionExceptionHandlerMock( - ProductionExceptionHandler.ProductionExceptionHandlerResponse.FAIL, + Optional.of(ProductionExceptionHandlerResponse.FAIL), context, sinkNodeName, taskId, - ProductionExceptionHandler.SerializationExceptionOrigin.KEY + SerializationExceptionOrigin.KEY )); collector.initialize(); @@ -1606,11 +1608,109 @@ public void shouldThrowStreamsExceptionWhenKeySerializationFailedAndProductionEx } } + @Test + public void shouldThrowStreamsExceptionWhenSerializationFailedAndProductionExceptionHandlerReturnsNull() { + try (final ErrorStringSerializer errorSerializer = new ErrorStringSerializer()) { + final RecordCollector collector = newRecordCollector(new ProductionExceptionHandlerMock( + Optional.empty(), + context, + sinkNodeName, + taskId, + SerializationExceptionOrigin.KEY + )); + collector.initialize(); + + final StreamsException exception = assertThrows( + StreamsException.class, + () -> collector.send(topic, "key", "val", null, 0, null, errorSerializer, stringSerializer, sinkNodeName, context) + ); + + assertEquals("Fatal user code error in production error callback", exception.getMessage()); + assertInstanceOf(NullPointerException.class, exception.getCause()); + assertEquals("Invalid ProductionExceptionHandler response.", exception.getCause().getMessage()); + } + } + + @Test + public void shouldThrowStreamsExceptionWhenSerializationFailedAndProductionExceptionHandlerThrows() { + try (final ErrorStringSerializer errorSerializer = new ErrorStringSerializer()) { + final RecordCollector collector = newRecordCollector(new ProductionExceptionHandlerMock( + null, // indicates to throw exception + context, + sinkNodeName, + taskId, + SerializationExceptionOrigin.KEY + )); + collector.initialize(); + + final StreamsException exception = assertThrows( + StreamsException.class, + () -> collector.send(topic, "key", "val", null, 0, null, errorSerializer, stringSerializer, sinkNodeName, context) + ); + + assertEquals("Fatal user code error in production error callback", exception.getMessage()); + assertEquals("CRASH", exception.getCause().getMessage()); + } + } + + @Test + public void shouldThrowStreamsExceptionOnSubsequentFlushIfASendFailsAndProductionExceptionHandlerReturnsNull() { + final KafkaException exception = new KafkaException("KABOOM!"); + final RecordCollector collector = new RecordCollectorImpl( + logContext, + taskId, + getExceptionalStreamsProducerOnSend(exception), + new ProductionExceptionHandlerMock( + Optional.empty(), + context, + sinkNodeName, + taskId, + SerializationExceptionOrigin.KEY + ), + streamsMetrics, + topology + ); + + collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, sinkNodeName, context, streamPartitioner); + + final StreamsException thrown = assertThrows(StreamsException.class, collector::flush); + assertEquals("Fatal user code error in production error callback", thrown.getMessage()); + assertInstanceOf(NullPointerException.class, thrown.getCause()); + assertEquals("Invalid ProductionExceptionHandler response.", thrown.getCause().getMessage()); + } + + @Test + public void shouldThrowStreamsExceptionOnSubsequentFlushIfASendFailsAndProductionExceptionHandlerThrows() { + final KafkaException exception = new KafkaException("KABOOM!"); + final RecordCollector collector = new RecordCollectorImpl( + logContext, + taskId, + getExceptionalStreamsProducerOnSend(exception), + new ProductionExceptionHandlerMock( + null, // indicates to throw exception + context, + sinkNodeName, + taskId, + SerializationExceptionOrigin.KEY + ), + streamsMetrics, + topology + ); + + collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, sinkNodeName, context, streamPartitioner); + + final StreamsException thrown = assertThrows(StreamsException.class, collector::flush); + assertEquals("Fatal user code error in production error callback", thrown.getMessage()); + assertEquals("CRASH", thrown.getCause().getMessage()); + } + @SuppressWarnings({"unchecked", "rawtypes"}) @Test public void shouldNotCallProductionExceptionHandlerOnClassCastException() { try (final ErrorStringSerializer errorSerializer = new ErrorStringSerializer()) { - final RecordCollector collector = newRecordCollector(new ProductionExceptionHandlerMock(ProductionExceptionHandler.ProductionExceptionHandlerResponse.CONTINUE)); + final RecordCollector collector = newRecordCollector( + new ProductionExceptionHandlerMock(Optional.of(ProductionExceptionHandlerResponse.CONTINUE)) + ); collector.initialize(); assertThat(mockProducer.history().isEmpty(), equalTo(true)); @@ -1766,17 +1866,17 @@ public byte[] serialize(final String topic, final Headers headers, final String } public static class ProductionExceptionHandlerMock implements ProductionExceptionHandler { - private final ProductionExceptionHandlerResponse response; + private final Optional response; private InternalProcessorContext expectedContext; private String expectedProcessorNodeId; private TaskId expectedTaskId; private SerializationExceptionOrigin expectedSerializationExceptionOrigin; - public ProductionExceptionHandlerMock(final ProductionExceptionHandlerResponse response) { + public ProductionExceptionHandlerMock(final Optional response) { this.response = response; } - public ProductionExceptionHandlerMock(final ProductionExceptionHandlerResponse response, + public ProductionExceptionHandlerMock(final Optional response, final InternalProcessorContext context, final String processorNodeId, final TaskId taskId) { @@ -1786,7 +1886,7 @@ public ProductionExceptionHandlerMock(final ProductionExceptionHandlerResponse r this.expectedTaskId = taskId; } - public ProductionExceptionHandlerMock(final ProductionExceptionHandlerResponse response, + public ProductionExceptionHandlerMock(final Optional response, final InternalProcessorContext context, final String processorNodeId, final TaskId taskId, @@ -1800,7 +1900,10 @@ public ProductionExceptionHandlerResponse handle(final ErrorHandlerContext conte final ProducerRecord record, final Exception exception) { assertInputs(context, exception); - return response; + if (response == null) { + throw new RuntimeException("CRASH"); + } + return response.orElse(null); } @Override @@ -1810,7 +1913,10 @@ public ProductionExceptionHandlerResponse handleSerializationException(final Err final SerializationExceptionOrigin origin) { assertInputs(context, exception); assertEquals(expectedSerializationExceptionOrigin, origin); - return response; + if (response == null) { + throw new RuntimeException("CRASH"); + } + return response.orElse(null); } @Override diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java index 23f364fc6a317..1bca1c9e3795e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java @@ -25,6 +25,7 @@ import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.errors.DeserializationExceptionHandler; +import org.apache.kafka.streams.errors.DeserializationExceptionHandler.DeserializationHandlerResponse; import org.apache.kafka.streams.errors.ErrorHandlerContext; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.TaskId; @@ -62,27 +63,29 @@ public class RecordDeserializerTest { @Test public void shouldReturnConsumerRecordWithDeserializedValueWhenNoExceptions() { - final RecordDeserializer recordDeserializer = new RecordDeserializer( - new TheSourceNode( - sourceNodeName, - false, - false, - "key", - "value" - ), - null, - new LogContext(), - new Metrics().sensor("dropped-records") - ); - final ConsumerRecord record = recordDeserializer.deserialize(null, rawRecord); - assertEquals(rawRecord.topic(), record.topic()); - assertEquals(rawRecord.partition(), record.partition()); - assertEquals(rawRecord.offset(), record.offset()); - assertEquals("key", record.key()); - assertEquals("value", record.value()); - assertEquals(rawRecord.timestamp(), record.timestamp()); - assertEquals(TimestampType.CREATE_TIME, record.timestampType()); - assertEquals(rawRecord.headers(), record.headers()); + try (final Metrics metrics = new Metrics()) { + final RecordDeserializer recordDeserializer = new RecordDeserializer( + new TheSourceNode( + sourceNodeName, + false, + false, + "key", + "value" + ), + null, + new LogContext(), + metrics.sensor("dropped-records") + ); + final ConsumerRecord record = recordDeserializer.deserialize(null, rawRecord); + assertEquals(rawRecord.topic(), record.topic()); + assertEquals(rawRecord.partition(), record.partition()); + assertEquals(rawRecord.offset(), record.offset()); + assertEquals("key", record.key()); + assertEquals("value", record.value()); + assertEquals(rawRecord.timestamp(), record.timestamp()); + assertEquals(TimestampType.CREATE_TIME, record.timestampType()); + assertEquals(rawRecord.headers(), record.headers()); + } } @ParameterizedTest @@ -93,30 +96,35 @@ public void shouldReturnConsumerRecordWithDeserializedValueWhenNoExceptions() { }) public void shouldThrowStreamsExceptionWhenDeserializationFailsAndExceptionHandlerRepliesWithFail(final boolean keyThrowsException, final boolean valueThrowsException) { - final RecordDeserializer recordDeserializer = new RecordDeserializer( - new TheSourceNode( - sourceNodeName, - keyThrowsException, - valueThrowsException, - "key", - "value" - ), - new DeserializationExceptionHandlerMock( - DeserializationExceptionHandler.DeserializationHandlerResponse.FAIL, - rawRecord, - sourceNodeName, - taskId - ), - new LogContext(), - new Metrics().sensor("dropped-records") - ); - - final StreamsException e = assertThrows(StreamsException.class, () -> recordDeserializer.deserialize(context, rawRecord)); - assertEquals(e.getMessage(), "Deserialization exception handler is set " - + "to fail upon a deserialization error. " - + "If you would rather have the streaming pipeline " - + "continue after a deserialization error, please set the " - + "default.deserialization.exception.handler appropriately."); + try (final Metrics metrics = new Metrics()) { + final RecordDeserializer recordDeserializer = new RecordDeserializer( + new TheSourceNode( + sourceNodeName, + keyThrowsException, + valueThrowsException, + "key", + "value" + ), + new DeserializationExceptionHandlerMock( + Optional.of(DeserializationHandlerResponse.FAIL), + rawRecord, + sourceNodeName, + taskId + ), + new LogContext(), + metrics.sensor("dropped-records") + ); + + final StreamsException e = assertThrows(StreamsException.class, () -> recordDeserializer.deserialize(context, rawRecord)); + assertEquals( + e.getMessage(), + "Deserialization exception handler is set " + + "to fail upon a deserialization error. " + + "If you would rather have the streaming pipeline " + + "continue after a deserialization error, please set the " + + "default.deserialization.exception.handler appropriately." + ); + } } @ParameterizedTest @@ -127,26 +135,89 @@ public void shouldThrowStreamsExceptionWhenDeserializationFailsAndExceptionHandl }) public void shouldNotThrowStreamsExceptionWhenDeserializationFailsAndExceptionHandlerRepliesWithContinue(final boolean keyThrowsException, final boolean valueThrowsException) { - final RecordDeserializer recordDeserializer = new RecordDeserializer( - new TheSourceNode( - sourceNodeName, - keyThrowsException, - valueThrowsException, - "key", - "value" - ), - new DeserializationExceptionHandlerMock( - DeserializationExceptionHandler.DeserializationHandlerResponse.CONTINUE, - rawRecord, - sourceNodeName, - taskId - ), - new LogContext(), - new Metrics().sensor("dropped-records") - ); - - final ConsumerRecord record = recordDeserializer.deserialize(context, rawRecord); - assertNull(record); + try (final Metrics metrics = new Metrics()) { + final RecordDeserializer recordDeserializer = new RecordDeserializer( + new TheSourceNode( + sourceNodeName, + keyThrowsException, + valueThrowsException, + "key", + "value" + ), + new DeserializationExceptionHandlerMock( + Optional.of(DeserializationHandlerResponse.CONTINUE), + rawRecord, + sourceNodeName, + taskId + ), + new LogContext(), + metrics.sensor("dropped-records") + ); + + final ConsumerRecord record = recordDeserializer.deserialize(context, rawRecord); + assertNull(record); + } + } + + @Test + public void shouldFailWhenDeserializationFailsAndExceptionHandlerReturnsNull() { + try (final Metrics metrics = new Metrics()) { + final RecordDeserializer recordDeserializer = new RecordDeserializer( + new TheSourceNode( + sourceNodeName, + true, + false, + "key", + "value" + ), + new DeserializationExceptionHandlerMock( + Optional.empty(), + rawRecord, + sourceNodeName, + taskId + ), + new LogContext(), + metrics.sensor("dropped-records") + ); + + final StreamsException exception = assertThrows( + StreamsException.class, + () -> recordDeserializer.deserialize(context, rawRecord) + ); + assertEquals("Fatal user code error in deserialization error callback", exception.getMessage()); + assertInstanceOf(NullPointerException.class, exception.getCause()); + assertEquals("Invalid DeserializationExceptionHandler response.", exception.getCause().getMessage()); + } + } + + @Test + public void shouldFailWhenDeserializationFailsAndExceptionHandlerThrows() { + try (final Metrics metrics = new Metrics()) { + final RecordDeserializer recordDeserializer = new RecordDeserializer( + new TheSourceNode( + sourceNodeName, + true, + false, + "key", + "value" + ), + new DeserializationExceptionHandlerMock( + null, // indicate to throw an exception + rawRecord, + sourceNodeName, + taskId + ), + new LogContext(), + metrics.sensor("dropped-records") + ); + + final StreamsException exception = assertThrows( + StreamsException.class, + () -> recordDeserializer.deserialize(context, rawRecord) + ); + assertEquals("Fatal user code error in deserialization error callback", exception.getMessage()); + assertEquals("CRASH", exception.getCause().getMessage()); + } } static class TheSourceNode extends SourceNode { @@ -185,12 +256,12 @@ public Object deserializeValue(final String topic, final Headers headers, final } public static class DeserializationExceptionHandlerMock implements DeserializationExceptionHandler { - private final DeserializationHandlerResponse response; + private final Optional response; private final ConsumerRecord expectedRecord; private final String expectedProcessorNodeId; private final TaskId expectedTaskId; - public DeserializationExceptionHandlerMock(final DeserializationHandlerResponse response, + public DeserializationExceptionHandlerMock(final Optional response, final ConsumerRecord record, final String processorNodeId, final TaskId taskId) { @@ -212,7 +283,10 @@ public DeserializationHandlerResponse handle(final ErrorHandlerContext context, assertEquals(expectedRecord, record); assertInstanceOf(RuntimeException.class, exception); assertEquals("KABOOM!", exception.getMessage()); - return response; + if (response == null) { + throw new RuntimeException("CRASH"); + } + return response.orElse(null); } @Override diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index a8771c21539da..3fa33ef895410 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -53,6 +53,7 @@ import org.apache.kafka.streams.errors.LogAndFailExceptionHandler; import org.apache.kafka.streams.errors.LogAndFailProcessingExceptionHandler; import org.apache.kafka.streams.errors.ProcessingExceptionHandler; +import org.apache.kafka.streams.errors.ProcessingExceptionHandler.ProcessingHandlerResponse; import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.TaskCorruptedException; @@ -2661,17 +2662,20 @@ public void shouldNotCheckpointAfterRestorationWhenExactlyOnceEnabled() { } @Test - public void shouldPunctuateNotHandleFailProcessingExceptionAndThrowStreamsException() { + public void punctuateShouldNotHandleFailProcessingExceptionAndThrowStreamsException() { when(stateManager.taskId()).thenReturn(taskId); when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); - task = createStatelessTask(createConfig(AT_LEAST_ONCE, "100", - LogAndFailExceptionHandler.class.getName(), LogAndContinueProcessingExceptionHandler.class.getName())); + task = createStatelessTask(createConfig( + AT_LEAST_ONCE, + "100", + LogAndFailExceptionHandler.class.getName(), + LogAndContinueProcessingExceptionHandler.class.getName() + )); - final StreamsException streamsException = assertThrows(StreamsException.class, () -> - task.punctuate(processorStreamTime, 1, PunctuationType.STREAM_TIME, timestamp -> { - throw new FailedProcessingException( - new RuntimeException("KABOOM!") - ); + final StreamsException streamsException = assertThrows( + StreamsException.class, + () -> task.punctuate(processorStreamTime, 1, PunctuationType.STREAM_TIME, timestamp -> { + throw new FailedProcessingException(new RuntimeException("KABOOM!")); }) ); @@ -2680,11 +2684,15 @@ public void shouldPunctuateNotHandleFailProcessingExceptionAndThrowStreamsExcept } @Test - public void shouldPunctuateNotHandleTaskCorruptedExceptionAndThrowItAsIs() { + public void punctuateShouldNotHandleTaskCorruptedExceptionAndThrowItAsIs() { when(stateManager.taskId()).thenReturn(taskId); when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); - task = createStatelessTask(createConfig(AT_LEAST_ONCE, "100", - LogAndFailExceptionHandler.class.getName(), LogAndContinueProcessingExceptionHandler.class.getName())); + task = createStatelessTask(createConfig( + AT_LEAST_ONCE, + "100", + LogAndFailExceptionHandler.class.getName(), + LogAndContinueProcessingExceptionHandler.class.getName() + )); final Set tasksIds = new HashSet<>(); tasksIds.add(new TaskId(0, 0)); @@ -2695,8 +2703,9 @@ public Set partitions() { } }); - final TaskCorruptedException taskCorruptedException = assertThrows(TaskCorruptedException.class, () -> - task.punctuate(processorStreamTime, 1, PunctuationType.STREAM_TIME, timestamp -> { + final TaskCorruptedException taskCorruptedException = assertThrows( + TaskCorruptedException.class, + () -> task.punctuate(processorStreamTime, 1, PunctuationType.STREAM_TIME, timestamp -> { throw expectedException; }) ); @@ -2705,16 +2714,21 @@ public Set partitions() { } @Test - public void shouldPunctuateNotHandleTaskMigratedExceptionAndThrowItAsIs() { + public void punctuateShouldNotHandleTaskMigratedExceptionAndThrowItAsIs() { when(stateManager.taskId()).thenReturn(taskId); when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); - task = createStatelessTask(createConfig(AT_LEAST_ONCE, "100", - LogAndFailExceptionHandler.class.getName(), LogAndContinueProcessingExceptionHandler.class.getName())); + task = createStatelessTask(createConfig( + AT_LEAST_ONCE, + "100", + LogAndFailExceptionHandler.class.getName(), + LogAndContinueProcessingExceptionHandler.class.getName() + )); final TaskMigratedException expectedException = new TaskMigratedException("TaskMigratedException", new RuntimeException("Task migrated cause")); - final TaskMigratedException taskCorruptedException = assertThrows(TaskMigratedException.class, () -> - task.punctuate(processorStreamTime, 1, PunctuationType.STREAM_TIME, timestamp -> { + final TaskMigratedException taskCorruptedException = assertThrows( + TaskMigratedException.class, + () -> task.punctuate(processorStreamTime, 1, PunctuationType.STREAM_TIME, timestamp -> { throw expectedException; }) ); @@ -2723,56 +2737,106 @@ public void shouldPunctuateNotHandleTaskMigratedExceptionAndThrowItAsIs() { } @Test - public void shouldPunctuateNotThrowStreamsExceptionWhenProcessingExceptionHandlerRepliesWithContinue() { + public void punctuateShouldNotThrowStreamsExceptionWhenProcessingExceptionHandlerRepliesWithContinue() { when(stateManager.taskId()).thenReturn(taskId); when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); - task = createStatelessTask(createConfig(AT_LEAST_ONCE, "100", - LogAndFailExceptionHandler.class.getName(), LogAndContinueProcessingExceptionHandler.class.getName())); + task = createStatelessTask(createConfig( + AT_LEAST_ONCE, + "100", + LogAndFailExceptionHandler.class.getName(), + LogAndContinueProcessingExceptionHandler.class.getName() + )); - assertDoesNotThrow(() -> - task.punctuate(processorStreamTime, 1, PunctuationType.STREAM_TIME, timestamp -> { + task.punctuate(processorStreamTime, 1, PunctuationType.STREAM_TIME, timestamp -> { + throw new KafkaException("KABOOM!"); + }); + } + + @Test + public void punctuateShouldThrowStreamsExceptionWhenProcessingExceptionHandlerRepliesWithFail() { + when(stateManager.taskId()).thenReturn(taskId); + when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); + task = createStatelessTask(createConfig( + AT_LEAST_ONCE, + "100", + LogAndFailExceptionHandler.class.getName(), + LogAndFailProcessingExceptionHandler.class.getName() + )); + + final StreamsException streamsException = assertThrows( + StreamsException.class, + () -> task.punctuate(processorStreamTime, 1, PunctuationType.STREAM_TIME, timestamp -> { throw new KafkaException("KABOOM!"); }) ); + + assertInstanceOf(KafkaException.class, streamsException.getCause()); + assertEquals("KABOOM!", streamsException.getCause().getMessage()); } @Test - public void shouldPunctuateThrowStreamsExceptionWhenProcessingExceptionHandlerRepliesWithFail() { + public void punctuateShouldThrowStreamsExceptionWhenProcessingExceptionHandlerReturnsNull() { when(stateManager.taskId()).thenReturn(taskId); when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); - task = createStatelessTask(createConfig(AT_LEAST_ONCE, "100", - LogAndFailExceptionHandler.class.getName(), LogAndFailProcessingExceptionHandler.class.getName())); + task = createStatelessTask(createConfig( + AT_LEAST_ONCE, + "100", + LogAndFailExceptionHandler.class.getName(), + NullProcessingExceptionHandler.class.getName() + )); - final StreamsException streamsException = assertThrows(StreamsException.class, + final StreamsException streamsException = assertThrows( + StreamsException.class, () -> task.punctuate(processorStreamTime, 1, PunctuationType.STREAM_TIME, timestamp -> { throw new KafkaException("KABOOM!"); - })); + }) + ); - assertInstanceOf(KafkaException.class, streamsException.getCause()); - assertEquals("KABOOM!", streamsException.getCause().getMessage()); + assertEquals("Fatal user code error in processing error callback", streamsException.getMessage()); + assertInstanceOf(NullPointerException.class, streamsException.getCause()); + assertEquals("Invalid ProcessingExceptionHandler response.", streamsException.getCause().getMessage()); } @Test - public void shouldPunctuateThrowFailedProcessingExceptionWhenProcessingExceptionHandlerThrowsAnException() { + public void punctuateShouldThrowFailedProcessingExceptionWhenProcessingExceptionHandlerThrowsAnException() { when(stateManager.taskId()).thenReturn(taskId); when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); - task = createStatelessTask(createConfig(AT_LEAST_ONCE, "100", - LogAndFailExceptionHandler.class.getName(), ProcessingExceptionHandlerMock.class.getName())); + task = createStatelessTask(createConfig( + AT_LEAST_ONCE, + "100", + LogAndFailExceptionHandler.class.getName(), + CrashingProcessingExceptionHandler.class.getName() + )); - final FailedProcessingException streamsException = assertThrows(FailedProcessingException.class, + final FailedProcessingException streamsException = assertThrows( + FailedProcessingException.class, () -> task.punctuate(processorStreamTime, 1, PunctuationType.STREAM_TIME, timestamp -> { throw new KafkaException("KABOOM!"); - })); + }) + ); - assertInstanceOf(RuntimeException.class, streamsException.getCause()); + assertEquals("Fatal user code error in processing error callback", streamsException.getMessage()); assertEquals("KABOOM from ProcessingExceptionHandlerMock!", streamsException.getCause().getMessage()); } - public static class ProcessingExceptionHandlerMock implements ProcessingExceptionHandler { + public static class CrashingProcessingExceptionHandler implements ProcessingExceptionHandler { @Override - public ProcessingExceptionHandler.ProcessingHandlerResponse handle(final ErrorHandlerContext context, final Record record, final Exception exception) { + public ProcessingHandlerResponse handle(final ErrorHandlerContext context, final Record record, final Exception exception) { throw new RuntimeException("KABOOM from ProcessingExceptionHandlerMock!"); } + + @Override + public void configure(final Map configs) { + // No-op + } + } + + public static class NullProcessingExceptionHandler implements ProcessingExceptionHandler { + @Override + public ProcessingHandlerResponse handle(final ErrorHandlerContext context, final Record record, final Exception exception) { + return null; + } + @Override public void configure(final Map configs) { // No-op From f13ae88b37a4ce3d4640e2e9a43d99172bc16c23 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Wed, 31 Jul 2024 22:02:36 -0700 Subject: [PATCH 2/5] fix typo --- .../kafka/streams/errors/DeserializationExceptionHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java b/streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java index 0d64611de678d..198a97cce448a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java +++ b/streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java @@ -37,7 +37,7 @@ public interface DeserializationExceptionHandler extends Configurable { * @param context processor context * @param record record that failed deserialization * @param exception the actual exception - * @deprecated Since 3.9. Use Please {@link #handle(ErrorHandlerContext, ConsumerRecord, Exception)} + * @deprecated Since 3.9. Use {@link #handle(ErrorHandlerContext, ConsumerRecord, Exception)} instead. */ @Deprecated default DeserializationHandlerResponse handle(final ProcessorContext context, From 87cef8f69386d071d574b6421816fda8808313b3 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Wed, 31 Jul 2024 22:19:11 -0700 Subject: [PATCH 3/5] cleanup --- .../streams/processor/internals/RecordCollectorImpl.java | 8 ++++---- .../kafka/streams/processor/internals/StreamTask.java | 2 +- .../ProcessingExceptionHandlerIntegrationTest.java | 4 ++-- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java index 86e1ebc1be9ee..e471587ed0db4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java @@ -335,15 +335,15 @@ private void handleException(final ProductionExceptionHandler.Serializati throw new StreamsException( String.format( "Unable to serialize record. ProducerRecord(topic=[%s], partition=[%d], timestamp=[%d]", - topic, - partition, - timestamp), + topic, + partition, + timestamp), serializationException ); } log.warn("Unable to serialize record, continue processing. " + - "ProducerRecord(topic=[{}], partition=[{}], timestamp=[{}])", + "ProducerRecord(topic=[{}], partition=[{}], timestamp=[{}])", topic, partition, timestamp); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index c3a5c59c7753b..f08cfa7fd671c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -955,7 +955,7 @@ record = null; processingExceptionHandler.handle(errorHandlerContext, null, processingException), "Invalid ProcessingExceptionHandler response." ); - } catch (final Exception fatalUserException) { + } catch (final RuntimeException fatalUserException) { log.error( "Processing error callback failed after processing error for record: {}", errorHandlerContext, diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java index f3c4471e64522..d0c32310550dd 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java @@ -62,7 +62,7 @@ public class ProcessingExceptionHandlerIntegrationTest { private final String threadId = Thread.currentThread().getName(); @Test - public void shouldFailWhenProcessingExceptionOccursIfExceptionHanlderReturnFail() { + public void shouldFailWhenProcessingExceptionOccursIfExceptionHandlerReturnsFail() { final List> events = Arrays.asList( new KeyValue<>("ID123-1", "ID123-A1"), new KeyValue<>("ID123-2-ERR", "ID123-A2"), @@ -107,7 +107,7 @@ public void shouldFailWhenProcessingExceptionOccursIfExceptionHanlderReturnFail( } @Test - public void shouldContinueWhenProcessingExceptionOccursIfExceptionHandlerReturnContinue() { + public void shouldContinueWhenProcessingExceptionOccursIfExceptionHandlerReturnsContinue() { final List> events = Arrays.asList( new KeyValue<>("ID123-1", "ID123-A1"), new KeyValue<>("ID123-2-ERR", "ID123-A2"), From 507b980ec7badc63e30372060799dc1a4dbf7476 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Thu, 1 Aug 2024 22:18:27 -0700 Subject: [PATCH 4/5] adjust tests --- .../kafka/streams/integration/EosIntegrationTest.java | 5 ++++- .../streams/integration/EosV2UpgradeIntegrationTest.java | 6 ++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java index 94e48ee3d4991..3b588439e92af 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java @@ -39,6 +39,7 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsConfig.InternalConfig; import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.TaskCorruptedException; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; @@ -1170,7 +1171,9 @@ public void process(final Record record) { final KafkaStreams streams = new KafkaStreams(builder.build(), config); streams.setUncaughtExceptionHandler((t, e) -> { - if (uncaughtException != null || !e.getMessage().contains("Injected test exception")) { + if (uncaughtException != null || + !(e instanceof StreamsException) || + !e.getCause().getMessage().equals("Injected test exception.")) { e.printStackTrace(System.err); hasUnexpectedError = true; } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosV2UpgradeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosV2UpgradeIntegrationTest.java index 8653f69fca6dd..4dd8e5697c41b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/EosV2UpgradeIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosV2UpgradeIntegrationTest.java @@ -35,6 +35,7 @@ import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsConfig.InternalConfig; +import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; @@ -947,8 +948,9 @@ public void process(final Record record) { } else { int exceptionCount = exceptionCounts.get(appDir); // should only have our injected exception or commit exception, and 2 exceptions for each stream - if (++exceptionCount > 2 || !(e instanceof RuntimeException) || - !(e.getMessage().contains("test exception"))) { + if (++exceptionCount > 2 || + !(e instanceof StreamsException) || + !(e.getCause().getMessage().endsWith(" test exception."))) { // The exception won't cause the test fail since we actually "expected" exception thrown and failed the stream. // So, log to stderr for debugging when the exception is not what we expected, and fail in the main thread e.printStackTrace(System.err); From 646ca1e14448e8e0f8ef873b108579aafd1636a4 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Fri, 2 Aug 2024 12:51:03 -0700 Subject: [PATCH 5/5] Github comments --- .../internals/RecordCollectorTest.java | 20 +++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java index 36d4434f344e6..ec8f6e5a9f971 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java @@ -1635,7 +1635,7 @@ public void shouldThrowStreamsExceptionWhenSerializationFailedAndProductionExcep public void shouldThrowStreamsExceptionWhenSerializationFailedAndProductionExceptionHandlerThrows() { try (final ErrorStringSerializer errorSerializer = new ErrorStringSerializer()) { final RecordCollector collector = newRecordCollector(new ProductionExceptionHandlerMock( - null, // indicates to throw exception + true, context, sinkNodeName, taskId, @@ -1687,7 +1687,7 @@ public void shouldThrowStreamsExceptionOnSubsequentFlushIfASendFailsAndProductio taskId, getExceptionalStreamsProducerOnSend(exception), new ProductionExceptionHandlerMock( - null, // indicates to throw exception + true, context, sinkNodeName, taskId, @@ -1867,6 +1867,7 @@ public byte[] serialize(final String topic, final Headers headers, final String public static class ProductionExceptionHandlerMock implements ProductionExceptionHandler { private final Optional response; + private boolean shouldThrowException; private InternalProcessorContext expectedContext; private String expectedProcessorNodeId; private TaskId expectedTaskId; @@ -1886,6 +1887,16 @@ public ProductionExceptionHandlerMock(final Optional context, + final String processorNodeId, + final TaskId taskId, + final SerializationExceptionOrigin origin) { + this(Optional.empty(), context, processorNodeId, taskId); + this.expectedSerializationExceptionOrigin = origin; + this.shouldThrowException = shouldThrowException; + } + public ProductionExceptionHandlerMock(final Optional response, final InternalProcessorContext context, final String processorNodeId, @@ -1893,6 +1904,7 @@ public ProductionExceptionHandlerMock(final Optional record, final Exception exception) { assertInputs(context, exception); - if (response == null) { + if (shouldThrowException) { throw new RuntimeException("CRASH"); } return response.orElse(null); @@ -1913,7 +1925,7 @@ public ProductionExceptionHandlerResponse handleSerializationException(final Err final SerializationExceptionOrigin origin) { assertInputs(context, exception); assertEquals(expectedSerializationExceptionOrigin, origin); - if (response == null) { + if (shouldThrowException) { throw new RuntimeException("CRASH"); } return response.orElse(null);