diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 70e4cba9625ac..5685fc4b40720 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -39,6 +39,8 @@ import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler; import org.apache.kafka.streams.errors.DeserializationExceptionHandler; import org.apache.kafka.streams.errors.LogAndFailExceptionHandler; +import org.apache.kafka.streams.errors.ProcessingExceptionHandler; +import org.apache.kafka.streams.errors.ProcessingLogAndFailExceptionHandler; import org.apache.kafka.streams.errors.ProductionExceptionHandler; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.internals.StreamsConfigUtils; @@ -553,6 +555,11 @@ public class StreamsConfig extends AbstractConfig { public static final String DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG = "default.production.exception.handler"; private static final String DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements the org.apache.kafka.streams.errors.ProductionExceptionHandler interface."; + /** {@code processing.exception.handler} */ + @SuppressWarnings("WeakerAccess") + public static final String PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG = "processing.exception.handler"; + public static final String PROCESSING_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements the org.apache.kafka.streams.errors.ProcessingExceptionHandler interface."; + /** {@code default.dsl.store} */ @Deprecated @SuppressWarnings("WeakerAccess") @@ -926,6 +933,11 @@ public class StreamsConfig extends AbstractConfig { DefaultProductionExceptionHandler.class.getName(), Importance.MEDIUM, DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_DOC) + .define(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, + Type.CLASS, + ProcessingLogAndFailExceptionHandler.class.getName(), + Importance.MEDIUM, + PROCESSING_EXCEPTION_HANDLER_CLASS_DOC) .define(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, Type.CLASS, FailOnInvalidTimestamp.class.getName(), @@ -1915,6 +1927,11 @@ public ProductionExceptionHandler defaultProductionExceptionHandler() { return getConfiguredInstance(DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, ProductionExceptionHandler.class); } + @SuppressWarnings("WeakerAccess") + public ProcessingExceptionHandler processingExceptionHandler() { + return getConfiguredInstance(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, ProcessingExceptionHandler.class); + } + /** * Override any client properties in the original configs with overrides * diff --git a/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java b/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java index 2d4157eba7885..b333de60bc7ad 100644 --- a/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java @@ -22,15 +22,16 @@ import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.errors.DeserializationExceptionHandler; +import org.apache.kafka.streams.errors.ProcessingExceptionHandler; import org.apache.kafka.streams.internals.StreamsConfigUtils; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.internals.MaterializedInternal; import org.apache.kafka.streams.processor.TimestampExtractor; - import org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper; import org.apache.kafka.streams.state.DslStoreSuppliers; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import java.util.Optional; import java.util.Properties; import java.util.function.Supplier; @@ -57,6 +58,7 @@ import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DSL_STORE_DOC; import static org.apache.kafka.streams.StreamsConfig.ROCKS_DB; import static org.apache.kafka.streams.StreamsConfig.IN_MEMORY; +import static org.apache.kafka.streams.StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG; import static org.apache.kafka.streams.internals.StreamsConfigUtils.getTotalCacheSize; /** @@ -135,6 +137,7 @@ public class TopologyConfig extends AbstractConfig { public final Class dslStoreSuppliers; public final Supplier timestampExtractorSupplier; public final Supplier deserializationExceptionHandlerSupplier; + public final Supplier processingExceptionHandler; public TopologyConfig(final StreamsConfig globalAppConfigs) { this(null, globalAppConfigs, new Properties()); @@ -225,6 +228,13 @@ public TopologyConfig(final String topologyName, final StreamsConfig globalAppCo deserializationExceptionHandlerSupplier = () -> globalAppConfigs.getConfiguredInstance(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, DeserializationExceptionHandler.class); } + if (isTopologyOverride(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, topologyOverrides)) { + processingExceptionHandler = () -> getConfiguredInstance(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, ProcessingExceptionHandler.class); + log.info("Topology {} is overriding {} to {}", topologyName, PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, getClass(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG)); + } else { + processingExceptionHandler = () -> globalAppConfigs.getConfiguredInstance(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, ProcessingExceptionHandler.class); + } + if (isTopologyOverride(DEFAULT_DSL_STORE_CONFIG, topologyOverrides)) { storeType = getString(DEFAULT_DSL_STORE_CONFIG); log.info("Topology {} is overriding {} to {}", topologyName, DEFAULT_DSL_STORE_CONFIG, storeType); @@ -280,7 +290,8 @@ public TaskConfig getTaskConfig() { maxBufferedSize, timestampExtractorSupplier.get(), deserializationExceptionHandlerSupplier.get(), - eosEnabled + eosEnabled, + processingExceptionHandler.get() ); } @@ -291,19 +302,22 @@ public static class TaskConfig { public final TimestampExtractor timestampExtractor; public final DeserializationExceptionHandler deserializationExceptionHandler; public final boolean eosEnabled; + public final ProcessingExceptionHandler processingExceptionHandler; private TaskConfig(final long maxTaskIdleMs, final long taskTimeoutMs, final int maxBufferedSize, final TimestampExtractor timestampExtractor, final DeserializationExceptionHandler deserializationExceptionHandler, - final boolean eosEnabled) { + final boolean eosEnabled, + final ProcessingExceptionHandler processingExceptionHandler) { this.maxTaskIdleMs = maxTaskIdleMs; this.taskTimeoutMs = taskTimeoutMs; this.maxBufferedSize = maxBufferedSize; this.timestampExtractor = timestampExtractor; this.deserializationExceptionHandler = deserializationExceptionHandler; this.eosEnabled = eosEnabled; + this.processingExceptionHandler = processingExceptionHandler; } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.java b/streams/src/main/java/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.java index 4fdb1a3fc0a95..a976346141277 100644 --- a/streams/src/main/java/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.java +++ b/streams/src/main/java/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.java @@ -18,6 +18,7 @@ import java.util.Map; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.streams.processor.ErrorHandlerContext; /** * {@code ProductionExceptionHandler} that always instructs streams to fail when an exception @@ -25,7 +26,8 @@ */ public class DefaultProductionExceptionHandler implements ProductionExceptionHandler { @Override - public ProductionExceptionHandlerResponse handle(final ProducerRecord record, + public ProductionExceptionHandlerResponse handle(final ErrorHandlerContext context, + final ProducerRecord record, final Exception exception) { return ProductionExceptionHandlerResponse.FAIL; } diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java b/streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java index 95ccfeced8e43..6b2cc335c34e8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java +++ b/streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java @@ -16,10 +16,11 @@ */ package org.apache.kafka.streams.errors; - import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.Configurable; +import org.apache.kafka.streams.processor.ErrorHandlerContext; import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.ErrorHandlerContextImpl; /** * Interface that specifies how an exception from source node deserialization @@ -37,11 +38,28 @@ public interface DeserializationExceptionHandler extends Configurable { * @param context processor context * @param record record that failed deserialization * @param exception the actual exception + * @deprecated Please {@link #handle(ErrorHandlerContext, ConsumerRecord, Exception)} */ - @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated. - DeserializationHandlerResponse handle(final ProcessorContext context, - final ConsumerRecord record, - final Exception exception); + @Deprecated + default DeserializationHandlerResponse handle(final ProcessorContext context, + final ConsumerRecord record, + final Exception exception) { + + throw new UnsupportedOperationException(); + } + /** + * Inspect a record and the exception received. + * + * @param context error handler context + * @param record record that failed deserialization + * @param exception the actual exception + */ + default DeserializationHandlerResponse handle(final ErrorHandlerContext context, + final ConsumerRecord record, + final Exception exception) { + + return handle(((ErrorHandlerContextImpl) context).convertToProcessorContext(), record, exception); + } /** * Enumeration that describes the response from the exception handler. diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java index 4f9a0964405a2..0494b44500210 100644 --- a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java +++ b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java @@ -18,6 +18,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.streams.processor.ErrorHandlerContext; import org.apache.kafka.streams.processor.ProcessorContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,6 +33,7 @@ public class LogAndContinueExceptionHandler implements DeserializationExceptionH private static final Logger log = LoggerFactory.getLogger(LogAndContinueExceptionHandler.class); @Override + @Deprecated public DeserializationHandlerResponse handle(final ProcessorContext context, final ConsumerRecord record, final Exception exception) { @@ -44,6 +46,19 @@ public DeserializationHandlerResponse handle(final ProcessorContext context, return DeserializationHandlerResponse.CONTINUE; } + @Override + public DeserializationHandlerResponse handle(final ErrorHandlerContext context, + final ConsumerRecord record, + final Exception exception) { + + log.warn("Exception caught during Deserialization, " + + "taskId: {}, topic: {}, partition: {}, offset: {}", + context.taskId(), record.topic(), record.partition(), record.offset(), + exception); + + return DeserializationHandlerResponse.CONTINUE; + } + @Override public void configure(final Map configs) { // ignore diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java index 61d210649ba9a..5fc197a10b03e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java +++ b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.errors; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.streams.processor.ErrorHandlerContext; import org.apache.kafka.streams.processor.ProcessorContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,6 +33,7 @@ public class LogAndFailExceptionHandler implements DeserializationExceptionHandl private static final Logger log = LoggerFactory.getLogger(LogAndFailExceptionHandler.class); @Override + @Deprecated public DeserializationHandlerResponse handle(final ProcessorContext context, final ConsumerRecord record, final Exception exception) { @@ -44,6 +46,18 @@ public DeserializationHandlerResponse handle(final ProcessorContext context, return DeserializationHandlerResponse.FAIL; } + public DeserializationHandlerResponse handle(final ErrorHandlerContext context, + final ConsumerRecord record, + final Exception exception) { + + log.error("Exception caught during Deserialization, " + + "taskId: {}, topic: {}, partition: {}, offset: {}", + context.taskId(), record.topic(), record.partition(), record.offset(), + exception); + + return DeserializationHandlerResponse.FAIL; + } + @Override public void configure(final Map configs) { // ignore diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/ProcessingExceptionHandler.java b/streams/src/main/java/org/apache/kafka/streams/errors/ProcessingExceptionHandler.java new file mode 100644 index 0000000000000..acf4c58fbc481 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/errors/ProcessingExceptionHandler.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.errors; + +import org.apache.kafka.common.Configurable; +import org.apache.kafka.streams.processor.ErrorHandlerContext; +import org.apache.kafka.streams.processor.api.Record; + +/** + * An interface that allows user code to inspect a record that has failed processing + */ +public interface ProcessingExceptionHandler extends Configurable { + /** + * Inspect a record and the exception received + * + * @param context processing context metadata + * @param record record where the exception occurred + * @param exception the actual exception + */ + ProcessingHandlerResponse handle(ErrorHandlerContext context, Record record, Exception exception); + + enum ProcessingHandlerResponse { + /* continue with processing */ + CONTINUE(1, "CONTINUE"), + /* fail the processing and stop */ + FAIL(2, "FAIL"); + + /** + * the permanent and immutable name of processing exception response + */ + public final String name; + + /** + * the permanent and immutable id of processing exception response + */ + public final int id; + + ProcessingHandlerResponse(final int id, final String name) { + this.id = id; + this.name = name; + } + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/ProcessingLogAndContinueExceptionHandler.java b/streams/src/main/java/org/apache/kafka/streams/errors/ProcessingLogAndContinueExceptionHandler.java new file mode 100644 index 0000000000000..93d6a330c4bab --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/errors/ProcessingLogAndContinueExceptionHandler.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.errors; + +import org.apache.kafka.streams.processor.ErrorHandlerContext; +import org.apache.kafka.streams.processor.api.Record; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +/** + * Processing exception handler that logs a processing exception and then + * signals the processing pipeline to continue processing more records. + */ +public class ProcessingLogAndContinueExceptionHandler implements ProcessingExceptionHandler { + private static final Logger log = LoggerFactory.getLogger(ProcessingLogAndContinueExceptionHandler.class); + + @Override + public ProcessingHandlerResponse handle(final ErrorHandlerContext context, final Record record, final Exception exception) { + log.warn("Exception caught during message processing, " + + "processor node: {}, taskId: {}, source topic: {}, source partition: {}, source offset: {}", + context.processorNodeId(), context.taskId(), context.topic(), context.partition(), context.offset(), + exception); + + return ProcessingHandlerResponse.CONTINUE; + } + + @Override + public void configure(final Map configs) { + // ignore + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/ProcessingLogAndFailExceptionHandler.java b/streams/src/main/java/org/apache/kafka/streams/errors/ProcessingLogAndFailExceptionHandler.java new file mode 100644 index 0000000000000..facb300a9692b --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/errors/ProcessingLogAndFailExceptionHandler.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.errors; + +import org.apache.kafka.streams.processor.ErrorHandlerContext; +import org.apache.kafka.streams.processor.api.Record; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +/** + * Processing exception handler that logs a processing exception and then + * signals the processing pipeline to stop processing more records and fail. + */ +public class ProcessingLogAndFailExceptionHandler implements ProcessingExceptionHandler { + private static final Logger log = LoggerFactory.getLogger(ProcessingLogAndFailExceptionHandler.class); + + @Override + public ProcessingHandlerResponse handle(final ErrorHandlerContext context, final Record record, final Exception exception) { + log.warn("Exception caught during message processing, " + + "processor node: {}, taskId: {}, source topic: {}, source partition: {}, source offset: {}", + context.processorNodeId(), context.taskId(), context.topic(), context.partition(), context.offset(), + exception); + + return ProcessingHandlerResponse.FAIL; + } + + @Override + public void configure(final Map configs) { + // ignore + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java b/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java index 6ae0170bfc906..71b160b5d2d1c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java +++ b/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java @@ -18,6 +18,7 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.Configurable; +import org.apache.kafka.streams.processor.ErrorHandlerContext; /** * Interface that specifies how an exception when attempting to produce a result to @@ -30,22 +31,60 @@ public interface ProductionExceptionHandler extends Configurable { * * @param record The record that failed to produce * @param exception The exception that occurred during production + * @deprecated Use {@link #handle(ErrorHandlerContext, ProducerRecord, Exception)} instead */ - ProductionExceptionHandlerResponse handle(final ProducerRecord record, - final Exception exception); + @Deprecated + default ProductionExceptionHandlerResponse handle(final ProducerRecord record, + final Exception exception) { + throw new UnsupportedOperationException(); + } + + /** + * Inspect a record that we attempted to produce, and the exception that resulted + * from attempting to produce it and determine whether or not to continue processing. + * + * @param context The error handler context metadata + * @param record The record that failed to produce + * @param exception The exception that occurred during production + */ + @SuppressWarnings("deprecation") + default ProductionExceptionHandlerResponse handle(final ErrorHandlerContext context, + final ProducerRecord record, + final Exception exception) { + return handle(record, exception); + } /** * Handles serialization exception and determine if the process should continue. The default implementation is to * fail the process. * - * @param record the record that failed to serialize - * @param exception the exception that occurred during serialization + * @param record the record that failed to serialize + * @param exception the exception that occurred during serialization + * @deprecated Use {@link #handle(ErrorHandlerContext, ProducerRecord, Exception)} instead */ + @Deprecated default ProductionExceptionHandlerResponse handleSerializationException(final ProducerRecord record, final Exception exception) { return ProductionExceptionHandlerResponse.FAIL; } + /** + * Handles serialization exception and determine if the process should continue. The default implementation is to + * fail the process. + * + * @param context the error handler context metadata + * @param record the record that failed to serialize + * @param exception the exception that occurred during serialization + * @param origin the origin of the serialization exception + */ + @SuppressWarnings("deprecation") + default ProductionExceptionHandlerResponse handleSerializationException(final ErrorHandlerContext context, + final ProducerRecord record, + final Exception exception, + final SerializationExceptionOrigin origin) { + return handleSerializationException(record, exception); + } + enum ProductionExceptionHandlerResponse { /* continue processing */ CONTINUE(0, "CONTINUE"), @@ -68,4 +107,11 @@ enum ProductionExceptionHandlerResponse { this.name = name; } } + + enum SerializationExceptionOrigin { + /* serialization exception occurred during serialization of the key */ + KEY, + /* serialization exception occurred during serialization of the value */ + VALUE + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ErrorHandlerContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ErrorHandlerContext.java new file mode 100644 index 0000000000000..5a852d415a94c --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/ErrorHandlerContext.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor; + +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier; + + +/** + * ErrorHandlerContext interface + */ +public interface ErrorHandlerContext { + + /** + * Return the topic name of the current input record; could be {@code null} if it is not + * available. + * + *

For example, if this method is invoked within a {@link Punctuator#punctuate(long) + * punctuation callback}, or while processing a record that was forwarded by a punctuation + * callback, the record won't have an associated topic. + * Another example is + * {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)} + * (and siblings), that do not always guarantee to provide a valid topic name, as they might be + * executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL. + * + * @return the topic name + */ + String topic(); + + /** + * Return the partition id of the current input record; could be {@code -1} if it is not + * available. + * + *

For example, if this method is invoked within a {@link Punctuator#punctuate(long) + * punctuation callback}, or while processing a record that was forwarded by a punctuation + * callback, the record won't have an associated partition id. + * Another example is + * {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)} + * (and siblings), that do not always guarantee to provide a valid partition id, as they might be + * executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL. + * + * @return the partition id + */ + int partition(); + + /** + * Return the offset of the current input record; could be {@code -1} if it is not + * available. + * + *

For example, if this method is invoked within a {@link Punctuator#punctuate(long) + * punctuation callback}, or while processing a record that was forwarded by a punctuation + * callback, the record won't have an associated offset. + * Another example is + * {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)} + * (and siblings), that do not always guarantee to provide a valid offset, as they might be + * executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL. + * + * @return the offset + */ + long offset(); + + /** + * Return the headers of the current source record; could be an empty header if it is not + * available. + * + *

For example, if this method is invoked within a {@link Punctuator#punctuate(long) + * punctuation callback}, or while processing a record that was forwarded by a punctuation + * callback, the record might not have any associated headers. + * Another example is + * {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)} + * (and siblings), that do not always guarantee to provide valid headers, as they might be + * executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL. + * + * @return the headers + */ + Headers headers(); + + /** + * Return the non-deserialized byte[] of the input message key if the context has been triggered by a message. + * + *

If this method is invoked within a {@link Punctuator#punctuate(long) + * punctuation callback}, or while processing a record that was forwarded by a punctuation + * callback, it will return null. + * + *

If this method is invoked in a sub-topology due to a repartition, the returned key would be one sent + * to the repartition topic. + * + *

Always returns null if this method is invoked within a + * {@link org.apache.kafka.streams.errors.ProductionExceptionHandler#handle(ErrorHandlerContext, org.apache.kafka.clients.producer.ProducerRecord, Exception)} + * + * @return the raw byte of the key of the source message + */ + byte[] sourceRawKey(); + + /** + * Return the non-deserialized byte[] of the input message value if the context has been triggered by a message. + * + *

If this method is invoked within a {@link Punctuator#punctuate(long) + * punctuation callback}, or while processing a record that was forwarded by a punctuation + * callback, it will return null. + * + *

If this method is invoked in a sub-topology due to a repartition, the returned value would be one sent + * to the repartition topic. + * + *

Always returns null if this method is invoked within a + * {@link org.apache.kafka.streams.errors.ProductionExceptionHandler#handle(ErrorHandlerContext, org.apache.kafka.clients.producer.ProducerRecord, Exception)} + * + * @return the raw byte of the value of the source message + */ + byte[] sourceRawValue(); + + /** + * Return the current processor node id. + * + * @return the processor node id + */ + String processorNodeId(); + + /** + * Return the task id. + * + * @return the task id + */ + TaskId taskId(); +} diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractPartitionGroup.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractPartitionGroup.java index 12af39ca4dbd3..7183526d27ca0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractPartitionGroup.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractPartitionGroup.java @@ -55,6 +55,8 @@ abstract class AbstractPartitionGroup { abstract Long headRecordOffset(final TopicPartition partition); + abstract ConsumerRecord rawHeadRecord(); + abstract int numBuffered(); abstract int numBuffered(TopicPartition tp); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ErrorHandlerContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ErrorHandlerContextImpl.java new file mode 100644 index 0000000000000..0b4830cd13df6 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ErrorHandlerContextImpl.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.streams.processor.ErrorHandlerContext; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.TaskId; + +public class ErrorHandlerContextImpl implements ErrorHandlerContext { + private InternalProcessorContext processorContext; + private final String topic; + private final int partition; + private final long offset; + private final Headers headers; + private final byte[] sourceRawKey; + private final byte[] sourceRawValue; + private final String processorNodeId; + private final TaskId taskId; + + public ErrorHandlerContextImpl(final String topic, + final int partition, + final long offset, + final Headers headers, + final byte[] sourceRawKey, + final byte[] sourceRawValue, + final String processorNodeId, + final TaskId taskId) { + this(null, topic, partition, offset, headers, sourceRawKey, sourceRawValue, processorNodeId, taskId); + } + + public ErrorHandlerContextImpl(final InternalProcessorContext processorContext, + final String topic, + final int partition, + final long offset, + final Headers headers, + final byte[] sourceRawKey, + final byte[] sourceRawValue, + final String processorNodeId, + final TaskId taskId) { + this.processorContext = processorContext; + this.topic = topic; + this.partition = partition; + this.offset = offset; + this.headers = headers; + this.sourceRawKey = sourceRawKey; + this.sourceRawValue = sourceRawValue; + this.processorNodeId = processorNodeId; + this.taskId = taskId; + } + + @Override + public String topic() { + return this.topic; + } + + @Override + public int partition() { + return this.partition; + } + + @Override + public long offset() { + return this.offset; + } + + @Override + public Headers headers() { + return this.headers; + } + + @Override + public byte[] sourceRawKey() { + return this.sourceRawKey; + } + + @Override + public byte[] sourceRawValue() { + return this.sourceRawValue; + } + + @Override + public String processorNodeId() { + return this.processorNodeId; + } + + @Override + public TaskId taskId() { + return this.taskId; + } + + public ProcessorContext convertToProcessorContext() { + return new RestrictiveProcessorContext(this.processorContext); + + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java index d42e90bf33f4c..76a09985218af 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java @@ -329,7 +329,8 @@ private void reprocessState(final List topicPartitions, Thread.currentThread().getName(), globalProcessorContext.taskId().toString(), globalProcessorContext.metrics() - ) + ), + null ); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java index 4852ba97932e9..f52a1feb01785 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java @@ -71,6 +71,7 @@ class PartitionGroup extends AbstractPartitionGroup { private boolean allBuffered; private final Map idlePartitionDeadlines = new HashMap<>(); private final Map fetchedLags = new HashMap<>(); + private ConsumerRecord rawHeadRecord; PartitionGroup(final LogContext logContext, final Map partitionQueues, @@ -248,6 +249,7 @@ StampedRecord nextRecord(final RecordInfo info, final long wallClockTime) { if (queue != null) { // get the first record from this queue. + rawHeadRecord = queue.rawHeadRecord(); record = queue.poll(wallClockTime); if (record != null) { @@ -321,6 +323,16 @@ Long headRecordOffset(final TopicPartition partition) { return recordQueue.headRecordOffset(); } + /** + * Returns the raw head record + * + * @return the raw head record + */ + @Override + ConsumerRecord rawHeadRecord() { + return rawHeadRecord; + } + /** * @throws IllegalStateException if the record's partition does not belong to this partition group */ diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java index b484d26f0fe87..7cbd643ae2f6f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java @@ -20,10 +20,13 @@ import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.ProcessingExceptionHandler; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.Cancellable; +import org.apache.kafka.streams.processor.ErrorHandlerContext; import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.processor.StateStore; @@ -33,10 +36,13 @@ import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.internals.Task.TaskType; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics; import org.apache.kafka.streams.query.Position; import org.apache.kafka.streams.state.internals.PositionSerde; import org.apache.kafka.streams.state.internals.ThreadCache; import org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.time.Duration; import java.util.HashMap; @@ -44,12 +50,15 @@ import java.util.Map; import static org.apache.kafka.streams.StreamsConfig.InternalConfig.IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED; +import static org.apache.kafka.streams.StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG; import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix; import static org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration; import static org.apache.kafka.streams.processor.internals.AbstractReadOnlyDecorator.getReadOnlyStore; import static org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator.getReadWriteStore; public class ProcessorContextImpl extends AbstractProcessorContext implements RecordCollector.Supplier { + private static final Logger log = LoggerFactory.getLogger(ProcessorContextImpl.class); + // the below are null for standby tasks private StreamTask streamTask; private RecordCollector collector; @@ -57,7 +66,11 @@ public class ProcessorContextImpl extends AbstractProcessorContext cacheNameToFlushListener = new HashMap<>(); + private final Sensor droppedRecordsSensor; @SuppressWarnings("this-escape") public ProcessorContextImpl(final TaskId id, @@ -71,6 +84,10 @@ public ProcessorContextImpl(final TaskId id, appConfigs(), IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED, false); + processingExceptionHandler = config.processingExceptionHandler(); + + final String threadId = Thread.currentThread().getName(); + droppedRecordsSensor = TaskMetrics.droppedRecordsSensor(threadId, taskId().toString(), metrics()); } @Override @@ -288,7 +305,39 @@ private void forwardInternal(final ProcessorNode child, final Record record) { setCurrentNode(child); - child.process(record); + try { + child.process(record); + } catch (final Exception e) { + // prevent parent nodes to throw exception + if (!processingExceptionOccurred) { + processingExceptionOccurred = true; + final byte[] rawKey = streamTask.rawRecord() != null ? streamTask.rawRecord().key() : null; + final byte[] rawValue = streamTask.rawRecord() != null ? streamTask.rawRecord().value() : null; + + final ErrorHandlerContext errorHandlerContext = new ErrorHandlerContextImpl(topic(), + partition(), offset(), headers(), rawKey, rawValue, + child.name(), taskId()); + final ProcessingExceptionHandler.ProcessingHandlerResponse response = processingExceptionHandler + .handle(errorHandlerContext, record, e); + + if (response == ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL) { + throw new StreamsException("Processing exception handler is set to fail upon" + + " a processing error. If you would rather have the streaming pipeline" + + " continue after a processing error, please set the " + + PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG + " appropriately.", + e); + } else { + log.warn( + "Skipping record due to processing error. topic=[{}] partition=[{}] offset=[{}]", + topic(), + partition(), + offset(), + e + ); + droppedRecordsSensor.record(); + } + } + } if (child.isTerminalNode()) { streamTask.maybeRecordE2ELatency(record.timestamp(), currentSystemTimeMs(), child.name()); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java index 6b559180484aa..b1989281cc945 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java @@ -163,7 +163,7 @@ public void send(final String topic, + "topic=[{}]", topic); droppedRecordsSensor.record(); } else { - for (final int multicastPartition: multicastPartitions) { + for (final int multicastPartition : multicastPartitions) { send(topic, key, value, headers, multicastPartition, timestamp, keySerializer, valueSerializer, processorNodeId, context); } } @@ -191,61 +191,22 @@ public void send(final String topic, final InternalProcessorContext context) { checkForException(); - final byte[] keyBytes; - final byte[] valBytes; + byte[] keyBytes = null; + byte[] valBytes = null; try { keyBytes = keySerializer.serialize(topic, headers, key); + } catch (final ClassCastException exception) { + this.manageClassCastException(topic, key, value, keySerializer, valueSerializer, exception); + } catch (final Exception exception) { + this.manageOtherExceptions(topic, key, value, headers, partition, timestamp, processorNodeId, context, exception, ProductionExceptionHandler.SerializationExceptionOrigin.KEY); + return; + } + try { valBytes = valueSerializer.serialize(topic, headers, value); } catch (final ClassCastException exception) { - final String keyClass = key == null ? "unknown because key is null" : key.getClass().getName(); - final String valueClass = value == null ? "unknown because value is null" : value.getClass().getName(); - throw new StreamsException( - String.format( - "ClassCastException while producing data to topic %s. " + - "A serializer (key: %s / value: %s) is not compatible to the actual key or value type " + - "(key type: %s / value type: %s). " + - "Change the default Serdes in StreamConfig or provide correct Serdes via method parameters " + - "(for example if using the DSL, `#to(String topic, Produced produced)` with " + - "`Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).", - topic, - keySerializer.getClass().getName(), - valueSerializer.getClass().getName(), - keyClass, - valueClass), - exception); + this.manageClassCastException(topic, key, value, keySerializer, valueSerializer, exception); } catch (final Exception exception) { - final ProducerRecord record = new ProducerRecord<>(topic, partition, timestamp, key, value, headers); - final ProductionExceptionHandler.ProductionExceptionHandlerResponse response; - - log.debug(String.format("Error serializing record to topic %s", topic), exception); - - try { - response = productionExceptionHandler.handleSerializationException(record, exception); - } catch (final Exception e) { - log.error("Fatal when handling serialization exception", e); - recordSendError(topic, e, null); - return; - } - - if (response == ProductionExceptionHandlerResponse.FAIL) { - throw new StreamsException( - String.format( - "Unable to serialize record. ProducerRecord(topic=[%s], partition=[%d], timestamp=[%d]", - topic, - partition, - timestamp), - exception - ); - } - - log.warn("Unable to serialize record, continue processing. " + - "ProducerRecord(topic=[{}], partition=[{}], timestamp=[{}])", - topic, - partition, - timestamp); - - droppedRecordsSensor.record(); - + this.manageOtherExceptions(topic, key, value, headers, partition, timestamp, processorNodeId, context, exception, ProductionExceptionHandler.SerializationExceptionOrigin.VALUE); return; } @@ -282,7 +243,7 @@ public void send(final String topic, topicProducedSensor.record(bytesProduced, context.currentSystemTimeMs()); } } else { - recordSendError(topic, exception, serializedRecord); + recordSendError(topic, exception, serializedRecord, context, processorNodeId); // KAFKA-7510 only put message key and value in TRACE level log so we don't leak data by default log.trace("Failed record: (key {} value {} timestamp {}) topic=[{}] partition=[{}]", key, value, timestamp, topic, partition); @@ -290,7 +251,86 @@ public void send(final String topic, }); } - private void recordSendError(final String topic, final Exception exception, final ProducerRecord serializedRecord) { + private void manageClassCastException(final String topic, + final K key, + final V value, + final Serializer keySerializer, + final Serializer valueSerializer, + final ClassCastException exception) { + final String keyClass = key == null ? "unknown because key is null" : key.getClass().getName(); + final String valueClass = value == null ? "unknown because value is null" : value.getClass().getName(); + throw new StreamsException( + String.format( + "ClassCastException while producing data to topic %s. " + + "A serializer (key: %s / value: %s) is not compatible to the actual key or value type " + + "(key type: %s / value type: %s). " + + "Change the default Serdes in StreamConfig or provide correct Serdes via method parameters " + + "(for example if using the DSL, `#to(String topic, Produced produced)` with " + + "`Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).", + topic, + keySerializer.getClass().getName(), + valueSerializer.getClass().getName(), + keyClass, + valueClass), + exception); + } + + private void manageOtherExceptions(final String topic, + final K key, + final V value, + final Headers headers, + final Integer partition, + final Long timestamp, + final String processorNodeId, + final InternalProcessorContext context, + final Exception exception, + final ProductionExceptionHandler.SerializationExceptionOrigin origin) { + final ProducerRecord record = new ProducerRecord<>(topic, partition, timestamp, key, value, headers); + final ProductionExceptionHandler.ProductionExceptionHandlerResponse response; + + log.debug(String.format("Error serializing record to topic %s", topic), exception); + + try { + final ErrorHandlerContextImpl errorHandlerContext = new ErrorHandlerContextImpl( + context, + record.topic(), + record.partition(), + context.offset(), + record.headers(), + null, + null, + processorNodeId, + taskId); + response = productionExceptionHandler.handleSerializationException(errorHandlerContext, record, exception, origin); + } catch (final Exception e) { + log.error("Fatal when handling serialization exception", e); + recordSendError(topic, e, null, context, processorNodeId); + return; + } + + if (response == ProductionExceptionHandlerResponse.FAIL) { + throw new StreamsException( + String.format( + "Unable to serialize record. ProducerRecord(topic=[%s], partition=[%d], timestamp=[%d]", + topic, + partition, + timestamp), + exception + ); + } + + log.warn("Unable to serialize record, continue processing. " + + "ProducerRecord(topic=[{}], partition=[{}], timestamp=[{}])", + topic, + partition, + timestamp); + + droppedRecordsSensor.record(); + + } + + private void recordSendError(final String topic, final Exception exception, final ProducerRecord serializedRecord, + final InternalProcessorContext context, final String processorNodeId) { String errorMessage = String.format(SEND_EXCEPTION_MESSAGE, topic, taskId, exception.toString()); if (isFatalException(exception)) { @@ -311,7 +351,18 @@ private void recordSendError(final String topic, final Exception exception, fina "`delivery.timeout.ms` to a larger value to wait longer for such scenarios and avoid timeout errors"; sendException.set(new TaskCorruptedException(Collections.singleton(taskId))); } else { - if (productionExceptionHandler.handle(serializedRecord, exception) == ProductionExceptionHandlerResponse.FAIL) { + final ErrorHandlerContextImpl errorHandlerContext = new ErrorHandlerContextImpl( + null, + serializedRecord.topic(), + serializedRecord.partition(), + context != null ? context.offset() : -1, + serializedRecord.headers(), + null, + null, + processorNodeId, + taskId); + if (productionExceptionHandler.handle(errorHandlerContext, serializedRecord, exception) == ProductionExceptionHandlerResponse.FAIL) { + errorMessage += "\nException handler choose to FAIL the processing, no more records would be sent."; sendException.set(new StreamsException(errorMessage, exception)); } else { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java index 9c56a138acecd..3cd46ca143820 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java @@ -68,7 +68,7 @@ ConsumerRecord deserialize(final ProcessorContext processo Optional.empty() ); } catch (final Exception deserializationException) { - handleDeserializationFailure(deserializationExceptionHandler, processorContext, deserializationException, rawRecord, log, droppedRecordsSensor); + handleDeserializationFailure(deserializationExceptionHandler, processorContext, deserializationException, rawRecord, log, droppedRecordsSensor, sourceNode.name()); return null; // 'handleDeserializationFailure' would either throw or swallow -- if we swallow we need to skip the record by returning 'null' } } @@ -78,13 +78,21 @@ public static void handleDeserializationFailure(final DeserializationExceptionHa final Exception deserializationException, final ConsumerRecord rawRecord, final Logger log, - final Sensor droppedRecordsSensor) { + final Sensor droppedRecordsSensor, + final String sourceNodeName) { final DeserializationExceptionHandler.DeserializationHandlerResponse response; try { - response = deserializationExceptionHandler.handle( + final ErrorHandlerContextImpl errorHandlerContext = new ErrorHandlerContextImpl( (InternalProcessorContext) processorContext, - rawRecord, - deserializationException); + rawRecord.topic(), + rawRecord.partition(), + rawRecord.offset(), + rawRecord.headers(), + rawRecord.key(), + rawRecord.value(), + sourceNodeName, + processorContext.taskId()); + response = deserializationExceptionHandler.handle(errorHandlerContext, rawRecord, deserializationException); } catch (final Exception fatalUserException) { log.error( "Deserialization error callback failed after deserialization error for record {}", diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java index 297c287673892..b57028d635ba8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java @@ -51,6 +51,8 @@ public class RecordQueue { private final ArrayDeque> fifoQueue; private StampedRecord headRecord = null; + private ConsumerRecord rawHeadRecord; + private long partitionTime = UNKNOWN; private final Sensor droppedRecordsSensor; @@ -114,6 +116,15 @@ public TopicPartition partition() { return partition; } + /** + * Returns the head record of the queue + * + * @return ConsumerRecord + */ + public ConsumerRecord rawHeadRecord() { + return rawHeadRecord; + } + /** * Add a batch of {@link ConsumerRecord} into the queue * @@ -142,6 +153,7 @@ public StampedRecord poll(final long wallClockTime) { headRecord = null; headRecordSizeInBytes = 0L; + rawHeadRecord = null; partitionTime = Math.max(partitionTime, recordToReturn.timestamp); updateHead(); @@ -188,6 +200,7 @@ public void clear() { fifoQueue.clear(); headRecord = null; headRecordSizeInBytes = 0L; + rawHeadRecord = null; partitionTime = UNKNOWN; } @@ -232,12 +245,14 @@ private void updateHead() { } headRecord = new StampedRecord(deserialized, timestamp); headRecordSizeInBytes = consumerRecordSizeInBytes(raw); + rawHeadRecord = raw; } // if all records in the FIFO queue are corrupted, make the last one the headRecord // This record is used to update the offsets. See KAFKA-6502 for more details. if (headRecord == null && lastCorruptedRecord != null) { headRecord = new CorruptedRecord(lastCorruptedRecord); + rawHeadRecord = lastCorruptedRecord; } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RestrictiveProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RestrictiveProcessorContext.java new file mode 100644 index 0000000000000..268fe237455da --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RestrictiveProcessorContext.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.streams.StreamsMetrics; +import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.Cancellable; +import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.processor.Punctuator; +import org.apache.kafka.streams.processor.StateRestoreCallback; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.To; + +import java.io.File; +import java.time.Duration; +import java.util.Map; +import java.util.Objects; + +/** + * {@code ProcessorContext} implementation that will throw on any call . + */ +public final class RestrictiveProcessorContext implements ProcessorContext { + private final ProcessorContext delegate; + + private static final String EXPLANATION = "ProcessorContext#forward() is not supported from this context, " + + "as the framework must ensure the key is not changed (#forward allows changing the key on " + + "messages which are sent). Try another function, which doesn't allow the key to be changed " + + "(for example - #transformValues)."; + + public RestrictiveProcessorContext(final ProcessorContext delegate) { + this.delegate = Objects.requireNonNull(delegate, "delegate"); + } + + @Override + public String topic() { + return delegate.topic(); + } + + @Override + public int partition() { + return delegate.partition(); + } + + @Override + public long offset() { + return delegate.offset(); + } + + @Override + public Headers headers() { + return delegate.headers(); + } + + @Override + public TaskId taskId() { + return delegate.taskId(); + } + + @Override + public String applicationId() { + return delegate.applicationId(); + } + + @Override + public Serde keySerde() { + return delegate.keySerde(); + } + + @Override + public Serde valueSerde() { + return delegate.valueSerde(); + } + + @Override + public File stateDir() { + return delegate.stateDir(); + } + + @Override + public StreamsMetrics metrics() { + return delegate.metrics(); + } + + @Override + public long currentSystemTimeMs() { + return delegate.currentSystemTimeMs(); + } + + @Override + public long currentStreamTimeMs() { + return delegate.currentStreamTimeMs(); + } + + @Override + public long timestamp() { + return delegate.timestamp(); + } + + @Override + public Map appConfigs() { + return delegate.appConfigs(); + } + + @Override + public Map appConfigsWithPrefix(final String prefix) { + return delegate.appConfigsWithPrefix(prefix); + } + + @Override + public void register(final StateStore store, + final StateRestoreCallback stateRestoreCallback) { + throw new StreamsException(EXPLANATION); + } + + @Override + public S getStateStore(final String name) { + throw new StreamsException(EXPLANATION); + } + + @Override + public Cancellable schedule(final Duration interval, + final PunctuationType type, + final Punctuator callback) throws IllegalArgumentException { + throw new StreamsException(EXPLANATION); + } + + @Override + public void forward(final K key, final V value) { + throw new StreamsException(EXPLANATION); + } + + @Override + public void forward(final K key, final V value, final To to) { + throw new StreamsException(EXPLANATION); + } + + @Override + public void commit() { + throw new StreamsException(EXPLANATION); + } + + +} diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 163e2c0997e6d..1d0a9967600ed 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -29,11 +29,13 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.errors.DeserializationExceptionHandler; import org.apache.kafka.streams.errors.LockException; +import org.apache.kafka.streams.errors.ProcessingExceptionHandler; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.TaskCorruptedException; import org.apache.kafka.streams.errors.TaskMigratedException; import org.apache.kafka.streams.errors.TopologyException; import org.apache.kafka.streams.processor.Cancellable; +import org.apache.kafka.streams.processor.ErrorHandlerContext; import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.processor.TaskId; @@ -62,6 +64,7 @@ import static java.util.Collections.singleton; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeRecordSensor; +import static org.apache.kafka.streams.StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency; /** @@ -99,6 +102,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, private final Sensor restoreRemainingSensor; private final Sensor punctuateLatencySensor; private final Sensor bufferedRecordsSensor; + private final Sensor droppedRecordsSensor; private final Map e2eLatencySensors = new HashMap<>(); private final RecordQueueCreator recordQueueCreator; @@ -107,6 +111,8 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, protected final InternalProcessorContext processorContext; private StampedRecord record; + private ConsumerRecord rawRecord; + private boolean commitNeeded = false; private boolean commitRequested = false; private boolean hasPendingTxCommit = false; @@ -157,6 +163,7 @@ public StreamTask(final TaskId id, processLatencySensor = TaskMetrics.processLatencySensor(threadId, taskId, streamsMetrics); punctuateLatencySensor = TaskMetrics.punctuateSensor(threadId, taskId, streamsMetrics); bufferedRecordsSensor = TaskMetrics.activeBufferedRecordsSensor(threadId, taskId, streamsMetrics); + droppedRecordsSensor = TaskMetrics.droppedRecordsSensor(threadId, taskId, streamsMetrics); for (final String terminalNodeName : topology.terminalNodes()) { e2eLatencySensors.put( @@ -764,6 +771,7 @@ public boolean process(final long wallClockTime) { // get the next record to process record = partitionGroup.nextRecord(recordInfo, wallClockTime); + rawRecord = partitionGroup.rawHeadRecord(); // if there is no record to process, return immediately if (record == null) { @@ -907,7 +915,28 @@ public void punctuate(final ProcessorNode node, } catch (final StreamsException e) { throw e; } catch (final RuntimeException e) { - throw new StreamsException(String.format("%sException caught while punctuating processor '%s'", logPrefix, node.name()), e); + final ErrorHandlerContext errorHandlerContext = new ErrorHandlerContextImpl(recordContext.topic(), + recordContext.partition(), recordContext.offset(), recordContext.headers(), null, null, node.name(), id); + final ProcessingExceptionHandler.ProcessingHandlerResponse response = config.processingExceptionHandler + .handle(errorHandlerContext, null, e); + + if (response == ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL) { + throw new StreamsException(String.format("%sException caught while punctuating processor '%s'. " + + "Processing exception handler is set to fail upon" + + " a processing error. If you would rather have the streaming pipeline" + + " continue after a processing error, please set the " + + PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG + " appropriately.", logPrefix, node.name()), + e); + } else { + log.warn( + "Skipping punctuation due to processing error. topic=[{}] partition=[{}] offset=[{}]", + recordContext.topic(), + recordContext.partition(), + recordContext.offset(), + e + ); + droppedRecordsSensor.record(); + } } finally { processorContext.setCurrentNode(null); } @@ -1314,6 +1343,10 @@ long streamTime() { return partitionGroup.streamTime(); } + ConsumerRecord rawRecord() { + return rawRecord; + } + private class RecordQueueCreator { private final LogContext logContext; private final TimestampExtractor defaultTimestampExtractor; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SynchronizedPartitionGroup.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SynchronizedPartitionGroup.java index 3e544c432a285..c63d6d321d378 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SynchronizedPartitionGroup.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SynchronizedPartitionGroup.java @@ -69,6 +69,11 @@ synchronized Long headRecordOffset(final TopicPartition partition) { return wrapped.headRecordOffset(partition); } + @Override + ConsumerRecord rawHeadRecord() { + return wrapped.rawHeadRecord(); + } + @Override synchronized int numBuffered() { return wrapped.numBuffered(); diff --git a/streams/src/test/java/org/apache/kafka/streams/errors/AlwaysContinueProductionExceptionHandler.java b/streams/src/test/java/org/apache/kafka/streams/errors/AlwaysContinueProductionExceptionHandler.java index be1e98e4a7154..61fea5f27bb40 100644 --- a/streams/src/test/java/org/apache/kafka/streams/errors/AlwaysContinueProductionExceptionHandler.java +++ b/streams/src/test/java/org/apache/kafka/streams/errors/AlwaysContinueProductionExceptionHandler.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.errors; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.streams.processor.ErrorHandlerContext; import java.util.Map; @@ -26,14 +27,17 @@ */ public class AlwaysContinueProductionExceptionHandler implements ProductionExceptionHandler { @Override - public ProductionExceptionHandlerResponse handle(final ProducerRecord record, + public ProductionExceptionHandlerResponse handle(final ErrorHandlerContext context, + final ProducerRecord record, final Exception exception) { return ProductionExceptionHandlerResponse.CONTINUE; } @Override - public ProductionExceptionHandlerResponse handleSerializationException(final ProducerRecord record, - final Exception exception) { + public ProductionExceptionHandlerResponse handleSerializationException(final ErrorHandlerContext context, + final ProducerRecord record, + final Exception exception, + final SerializationExceptionOrigin origin) { return ProductionExceptionHandlerResponse.CONTINUE; } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java new file mode 100644 index 0000000000000..32b5512b1fa47 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java @@ -0,0 +1,544 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.KeyValueTimestamp; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.errors.ProcessingExceptionHandler; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.processor.ErrorHandlerContext; +import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.processor.api.ContextualProcessor; +import org.apache.kafka.streams.processor.api.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.MockProcessorSupplier; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.time.Duration; +import java.time.Instant; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertIterableEquals; + +@Category(IntegrationTest.class) +public class ProcessingExceptionHandlerIntegrationTest { + private final String threadId = Thread.currentThread().getName(); + + @org.junit.Test + public void shouldContinueInProcessorOnProcessingRecordAtBeginningExceptions() { + final List> events = Arrays.asList( + new KeyValue<>("ID123-1", "ID123-A1"), + new KeyValue<>("ID123-2", "ID123-A2"), + new KeyValue<>("ID123-3", "ID123-A3"), + new KeyValue<>("ID123-4", "ID123-A4") + ); + + final List> expected = Arrays.asList( + new KeyValueTimestamp<>("ID123-2", "ID123-A2", 0), + new KeyValueTimestamp<>("ID123-3", "ID123-A3", 0), + new KeyValueTimestamp<>("ID123-4", "ID123-A4", 0) + ); + + final MockProcessorSupplier processor = new MockProcessorSupplier<>(); + final StreamsBuilder builder = new StreamsBuilder(); + builder + .stream("TOPIC_NAME", Consumed.with(Serdes.String(), Serdes.String())) + .process(runtimeErrorProcessorSupplierMock()) + .process(processor); + + final Properties properties = new Properties(); + properties.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, ContinueProcessingExceptionHandlerMockTest.class); + + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), properties, Instant.ofEpochMilli(0L))) { + final TestInputTopic inputTopic = driver.createInputTopic("TOPIC_NAME", new StringSerializer(), new StringSerializer()); + inputTopic.pipeKeyValueList(events, Instant.EPOCH, Duration.ZERO); + + assertEquals(expected.size(), processor.theCapturedProcessor().processed().size()); + assertIterableEquals(expected, processor.theCapturedProcessor().processed()); + + final MetricName dropTotal = droppedRecordsTotalMetric(); + final MetricName dropRate = droppedRecordsRateMetric(); + + assertEquals(1.0, driver.metrics().get(dropTotal).metricValue()); + assertTrue((Double) driver.metrics().get(dropRate).metricValue() > 0.0); + } + } + + @org.junit.Test + public void shouldContinueInProcessorOnProcessingRecordInMiddleExceptions() { + final List> events = Arrays.asList( + new KeyValue<>("ID123-2", "ID123-A2"), + new KeyValue<>("ID123-1", "ID123-A1"), + new KeyValue<>("ID123-3", "ID123-A3"), + new KeyValue<>("ID123-4", "ID123-A4") + ); + + final List> expected = Arrays.asList( + new KeyValueTimestamp<>("ID123-2", "ID123-A2", 0), + new KeyValueTimestamp<>("ID123-3", "ID123-A3", 0), + new KeyValueTimestamp<>("ID123-4", "ID123-A4", 0) + ); + + final MockProcessorSupplier processor = new MockProcessorSupplier<>(); + final StreamsBuilder builder = new StreamsBuilder(); + builder + .stream("TOPIC_NAME", Consumed.with(Serdes.String(), Serdes.String())) + .process(runtimeErrorProcessorSupplierMock()) + .process(processor); + + final Properties properties = new Properties(); + properties.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, ContinueProcessingExceptionHandlerMockTest.class); + + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), properties, Instant.ofEpochMilli(0L))) { + final TestInputTopic inputTopic = driver.createInputTopic("TOPIC_NAME", new StringSerializer(), new StringSerializer()); + inputTopic.pipeKeyValueList(events, Instant.EPOCH, Duration.ZERO); + + assertEquals(expected.size(), processor.theCapturedProcessor().processed().size()); + assertIterableEquals(expected, processor.theCapturedProcessor().processed()); + + final MetricName dropTotal = droppedRecordsTotalMetric(); + final MetricName dropRate = droppedRecordsRateMetric(); + + assertEquals(1.0, driver.metrics().get(dropTotal).metricValue()); + assertTrue((Double) driver.metrics().get(dropRate).metricValue() > 0.0); + } + } + + @org.junit.Test + public void shouldContinueInProcessorOnProcessingRecordAtEndExceptions() { + final List> events = Arrays.asList( + new KeyValue<>("ID123-2", "ID123-A2"), + new KeyValue<>("ID123-3", "ID123-A3"), + new KeyValue<>("ID123-4", "ID123-A4"), + new KeyValue<>("ID123-1", "ID123-A1") + ); + + final List> expected = Arrays.asList( + new KeyValueTimestamp<>("ID123-2", "ID123-A2", 0), + new KeyValueTimestamp<>("ID123-3", "ID123-A3", 0), + new KeyValueTimestamp<>("ID123-4", "ID123-A4", 0) + ); + + final MockProcessorSupplier processor = new MockProcessorSupplier<>(); + final StreamsBuilder builder = new StreamsBuilder(); + builder + .stream("TOPIC_NAME", Consumed.with(Serdes.String(), Serdes.String())) + .process(runtimeErrorProcessorSupplierMock()) + .process(processor); + + final Properties properties = new Properties(); + properties.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, ContinueProcessingExceptionHandlerMockTest.class); + + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), properties, Instant.ofEpochMilli(0L))) { + final TestInputTopic inputTopic = driver.createInputTopic("TOPIC_NAME", new StringSerializer(), new StringSerializer()); + inputTopic.pipeKeyValueList(events, Instant.EPOCH, Duration.ZERO); + + assertEquals(expected.size(), processor.theCapturedProcessor().processed().size()); + assertIterableEquals(expected, processor.theCapturedProcessor().processed()); + + final MetricName dropTotal = droppedRecordsTotalMetric(); + final MetricName dropRate = droppedRecordsRateMetric(); + + assertEquals(1.0, driver.metrics().get(dropTotal).metricValue()); + assertTrue((Double) driver.metrics().get(dropRate).metricValue() > 0.0); + } + } + + @org.junit.Test + public void shouldFailInProcessorOnProcessingRecordAtBeginningExceptions() { + final List> events = Arrays.asList( + new KeyValue<>("ID123-1", "ID123-A1"), + new KeyValue<>("ID123-2", "ID123-A2"), + new KeyValue<>("ID123-3", "ID123-A3"), + new KeyValue<>("ID123-4", "ID123-A4") + ); + + final List> expected = Collections.emptyList(); + + final MockProcessorSupplier processor = new MockProcessorSupplier<>(); + final StreamsBuilder builder = new StreamsBuilder(); + builder + .stream("TOPIC_NAME", Consumed.with(Serdes.String(), Serdes.String())) + .process(runtimeErrorProcessorSupplierMock()) + .process(processor); + + final Properties properties = new Properties(); + properties.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, FailProcessingExceptionHandlerMockTest.class); + + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), properties, Instant.ofEpochMilli(0L))) { + final TestInputTopic inputTopic = driver.createInputTopic("TOPIC_NAME", new StringSerializer(), new StringSerializer()); + + final RuntimeException exception = assertThrows(RuntimeException.class, + () -> inputTopic.pipeKeyValueList(events, Instant.EPOCH, Duration.ZERO)); + + assertEquals("Exception should be handled by processing exception handler", exception.getCause().getMessage()); + assertEquals(0, processor.theCapturedProcessor().processed().size()); + assertIterableEquals(expected, processor.theCapturedProcessor().processed()); + + final MetricName dropTotal = droppedRecordsTotalMetric(); + final MetricName dropRate = droppedRecordsRateMetric(); + + assertEquals(0.0, driver.metrics().get(dropTotal).metricValue()); + assertEquals(0.0, driver.metrics().get(dropRate).metricValue()); + } + } + + @org.junit.Test + public void shouldFailInProcessorOnProcessingRecordInMiddleExceptions() { + final List> events = Arrays.asList( + new KeyValue<>("ID123-2", "ID123-A2"), + new KeyValue<>("ID123-1", "ID123-A1"), + new KeyValue<>("ID123-3", "ID123-A3"), + new KeyValue<>("ID123-4", "ID123-A4") + ); + + final List> expected = Collections.singletonList( + new KeyValueTimestamp<>("ID123-2", "ID123-A2", 0) + ); + + final MockProcessorSupplier processor = new MockProcessorSupplier<>(); + final StreamsBuilder builder = new StreamsBuilder(); + builder + .stream("TOPIC_NAME", Consumed.with(Serdes.String(), Serdes.String())) + .process(runtimeErrorProcessorSupplierMock()) + .process(processor); + + final Properties properties = new Properties(); + properties.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, FailProcessingExceptionHandlerMockTest.class); + + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), properties, Instant.ofEpochMilli(0L))) { + final TestInputTopic inputTopic = driver.createInputTopic("TOPIC_NAME", new StringSerializer(), new StringSerializer()); + + final RuntimeException exception = assertThrows(RuntimeException.class, + () -> inputTopic.pipeKeyValueList(events, Instant.EPOCH, Duration.ZERO)); + + assertEquals("Exception should be handled by processing exception handler", exception.getCause().getMessage()); + assertEquals(expected.size(), processor.theCapturedProcessor().processed().size()); + assertIterableEquals(expected, processor.theCapturedProcessor().processed()); + + final MetricName dropTotal = droppedRecordsTotalMetric(); + final MetricName dropRate = droppedRecordsRateMetric(); + + assertEquals(0.0, driver.metrics().get(dropTotal).metricValue()); + assertEquals(0.0, driver.metrics().get(dropRate).metricValue()); + } + } + + @org.junit.Test + public void shouldFailInProcessorOnProcessingRecordAtEndExceptions() { + final List> events = Arrays.asList( + new KeyValue<>("ID123-2", "ID123-A2"), + new KeyValue<>("ID123-3", "ID123-A3"), + new KeyValue<>("ID123-4", "ID123-A4"), + new KeyValue<>("ID123-1", "ID123-A1") + ); + + final List> expected = Arrays.asList( + new KeyValueTimestamp<>("ID123-2", "ID123-A2", 0), + new KeyValueTimestamp<>("ID123-3", "ID123-A3", 0), + new KeyValueTimestamp<>("ID123-4", "ID123-A4", 0) + ); + + final MockProcessorSupplier processor = new MockProcessorSupplier<>(); + final StreamsBuilder builder = new StreamsBuilder(); + builder + .stream("TOPIC_NAME", Consumed.with(Serdes.String(), Serdes.String())) + .process(runtimeErrorProcessorSupplierMock()) + .process(processor); + + final Properties properties = new Properties(); + properties.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, FailProcessingExceptionHandlerMockTest.class); + + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), properties, Instant.ofEpochMilli(0L))) { + final TestInputTopic inputTopic = driver.createInputTopic("TOPIC_NAME", new StringSerializer(), new StringSerializer()); + + final RuntimeException exception = assertThrows(RuntimeException.class, + () -> inputTopic.pipeKeyValueList(events, Instant.EPOCH, Duration.ZERO)); + + assertEquals("Exception should be handled by processing exception handler", exception.getCause().getMessage()); + assertEquals(expected.size(), processor.theCapturedProcessor().processed().size()); + assertIterableEquals(expected, processor.theCapturedProcessor().processed()); + + final MetricName dropTotal = droppedRecordsTotalMetric(); + final MetricName dropRate = droppedRecordsRateMetric(); + + assertEquals(0.0, driver.metrics().get(dropTotal).metricValue()); + assertEquals(0.0, driver.metrics().get(dropRate).metricValue()); + } + } + + @org.junit.Test + public void shouldContinueOnPunctuateExceptions() { + final List> events = Arrays.asList( + new KeyValue<>("ID123-1", "ID123-A1"), + new KeyValue<>("ID123-2", "ID123-A2"), + new KeyValue<>("ID123-3", "ID123-A3"), + new KeyValue<>("ID123-4", "ID123-A4") + ); + + final List> expected = Arrays.asList( + new KeyValueTimestamp<>("ID123-1", "ID123-A1", 0), + new KeyValueTimestamp<>("ID123-2", "ID123-A2", 0), + new KeyValueTimestamp<>("ID123-3", "ID123-A3", 0), + new KeyValueTimestamp<>("ID123-4", "ID123-A4", 0) + ); + + final MockProcessorSupplier processor = new MockProcessorSupplier<>(); + final StreamsBuilder builder = new StreamsBuilder(); + builder + .stream("TOPIC_NAME", Consumed.with(Serdes.String(), Serdes.String())) + .process(runtimeErrorPunctuateProcessorSupplierMock()) + .process(processor); + + final Properties properties = new Properties(); + properties.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, ContinuePunctuateProcessingExceptionHandlerMockTest.class); + + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), properties, Instant.ofEpochMilli(0L))) { + final TestInputTopic inputTopic = driver.createInputTopic("TOPIC_NAME", new StringSerializer(), new StringSerializer()); + inputTopic.pipeKeyValueList(events, Instant.EPOCH, Duration.ZERO); + driver.advanceWallClockTime(Duration.ofSeconds(2)); + + assertEquals(expected.size(), processor.theCapturedProcessor().processed().size()); + assertIterableEquals(expected, processor.theCapturedProcessor().processed()); + + final MetricName dropTotal = droppedRecordsTotalMetric(); + final MetricName dropRate = droppedRecordsRateMetric(); + + assertEquals(1.0, driver.metrics().get(dropTotal).metricValue()); + assertTrue((Double) driver.metrics().get(dropRate).metricValue() > 0.0); + } + } + + @Test + public void shouldFailOnPunctuateExceptions() { + final List> events = Arrays.asList( + new KeyValue<>("ID123-1", "ID123-A1"), + new KeyValue<>("ID123-2", "ID123-A2"), + new KeyValue<>("ID123-3", "ID123-A3") + ); + + final List> expected = Arrays.asList( + new KeyValueTimestamp<>("ID123-1", "ID123-A1", 0), + new KeyValueTimestamp<>("ID123-2", "ID123-A2", 0), + new KeyValueTimestamp<>("ID123-3", "ID123-A3", 0) + ); + + final MockProcessorSupplier processor = new MockProcessorSupplier<>(); + final StreamsBuilder builder = new StreamsBuilder(); + builder + .stream("TOPIC_NAME", Consumed.with(Serdes.String(), Serdes.String())) + .process(runtimeErrorPunctuateProcessorSupplierMock()) + .process(processor); + + final Properties properties = new Properties(); + properties.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, FailPunctuateProcessingExceptionHandlerMockTest.class); + + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), properties, Instant.ofEpochMilli(0L))) { + final TestInputTopic inputTopic = driver.createInputTopic("TOPIC_NAME", new StringSerializer(), new StringSerializer()); + + final RuntimeException exception = assertThrows(RuntimeException.class, + () -> { + inputTopic.pipeKeyValueList(events, Instant.EPOCH, Duration.ZERO); + driver.advanceWallClockTime(Duration.ofSeconds(2)); + inputTopic.pipeInput("ID123-4", "ID123-A4", 0); + }); + + assertEquals("Exception should be handled by processing exception handler", exception.getCause().getMessage()); + assertEquals(expected.size(), processor.theCapturedProcessor().processed().size()); + assertIterableEquals(expected, processor.theCapturedProcessor().processed()); + + final MetricName dropTotal = droppedRecordsTotalMetric(); + final MetricName dropRate = droppedRecordsRateMetric(); + + assertEquals(0.0, driver.metrics().get(dropTotal).metricValue()); + assertEquals(0.0, driver.metrics().get(dropRate).metricValue()); + } + } + + public static class ContinueProcessingExceptionHandlerMockTest implements ProcessingExceptionHandler { + @Override + public ProcessingExceptionHandler.ProcessingHandlerResponse handle(final ErrorHandlerContext context, final Record record, final Exception exception) { + assertEquals("ID123-1", new String(context.sourceRawKey())); + assertEquals("ID123-A1", new String(context.sourceRawValue())); + assertEquals("ID123-1", record.key()); + assertEquals("ID123-A1", record.value()); + assertEquals("TOPIC_NAME", context.topic()); + assertEquals("KSTREAM-PROCESSOR-0000000001", context.processorNodeId()); + assertTrue(exception.getMessage().contains("Exception should be handled by processing exception handler")); + + return ProcessingExceptionHandler.ProcessingHandlerResponse.CONTINUE; + } + + @Override + public void configure(final Map configs) { + // No-op + } + } + + public static class FailProcessingExceptionHandlerMockTest implements ProcessingExceptionHandler { + @Override + public ProcessingExceptionHandler.ProcessingHandlerResponse handle(final ErrorHandlerContext context, final Record record, final Exception exception) { + assertEquals("ID123-1", new String(context.sourceRawKey())); + assertEquals("ID123-A1", new String(context.sourceRawValue())); + assertEquals("ID123-1", record.key()); + assertEquals("ID123-A1", record.value()); + assertEquals("TOPIC_NAME", context.topic()); + assertEquals("KSTREAM-PROCESSOR-0000000001", context.processorNodeId()); + assertTrue(exception.getMessage().contains("Exception should be handled by processing exception handler")); + + return ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL; + } + + @Override + public void configure(final Map configs) { + // No-op + } + } + + public static class ContinuePunctuateProcessingExceptionHandlerMockTest implements ProcessingExceptionHandler { + @Override + public ProcessingExceptionHandler.ProcessingHandlerResponse handle(final ErrorHandlerContext context, final Record record, final Exception exception) { + assertNull(context.sourceRawKey()); + assertNull(context.sourceRawValue()); + assertNull(record); + assertNull(context.topic()); + assertEquals("KSTREAM-PROCESSOR-0000000001", context.processorNodeId()); + assertTrue(exception.getMessage().contains("Exception should be handled by processing exception handler")); + + return ProcessingExceptionHandler.ProcessingHandlerResponse.CONTINUE; + } + + @Override + public void configure(final Map configs) { + // No-op + } + } + + public static class FailPunctuateProcessingExceptionHandlerMockTest implements ProcessingExceptionHandler { + @Override + public ProcessingExceptionHandler.ProcessingHandlerResponse handle(final ErrorHandlerContext context, final Record record, final Exception exception) { + assertNull(context.sourceRawKey()); + assertNull(context.sourceRawValue()); + assertNull(record); + assertNull(context.topic()); + assertEquals("KSTREAM-PROCESSOR-0000000001", context.processorNodeId()); + assertTrue(exception.getMessage().contains("Exception should be handled by processing exception handler")); + + return ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL; + } + + @Override + public void configure(final Map configs) { + // No-op + } + } + + /** + * Metric name for dropped records total. + * + * @return the metric name + */ + private MetricName droppedRecordsTotalMetric() { + return new MetricName( + "dropped-records-total", + "stream-task-metrics", + "The total number of dropped records", + mkMap( + mkEntry("thread-id", threadId), + mkEntry("task-id", "0_0") + ) + ); + } + + /** + * Metric name for dropped records rate. + * + * @return the metric name + */ + private MetricName droppedRecordsRateMetric() { + return new MetricName( + "dropped-records-rate", + "stream-task-metrics", + "The average number of dropped records per second", + mkMap( + mkEntry("thread-id", threadId), + mkEntry("task-id", "0_0") + ) + ); + } + + /** + * Processor supplier that throws a runtime exception on schedule. + * + * @return the processor supplier + */ + private org.apache.kafka.streams.processor.api.ProcessorSupplier runtimeErrorPunctuateProcessorSupplierMock() { + return () -> new org.apache.kafka.streams.processor.api.Processor() { + org.apache.kafka.streams.processor.api.ProcessorContext context; + + @Override + public void init(final org.apache.kafka.streams.processor.api.ProcessorContext context) { + this.context = context; + this.context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, ts -> { + throw new RuntimeException("Exception should be handled by processing exception handler"); + }); + } + + @Override + public void process(final Record record) { + context.forward(record); + } + }; + } + + /** + * Processor supplier that throws a runtime exception on process. + * + * @return the processor supplier + */ + private ProcessorSupplier runtimeErrorProcessorSupplierMock() { + return () -> new ContextualProcessor() { + @Override + public void process(final Record record) { + if (record.key().equals("ID123-1")) { + throw new RuntimeException("Exception should be handled by processing exception handler"); + } + + context().forward(new Record<>(record.key(), record.value(), record.timestamp())); + } + }; + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java index 6bee7f32c24fa..4abd5f240dae9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java @@ -16,20 +16,29 @@ */ package org.apache.kafka.streams.processor.internals; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.MetricName; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsConfig.InternalConfig; +import org.apache.kafka.streams.errors.ProcessingExceptionHandler; +import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.processor.ErrorHandlerContext; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.To; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.internals.Task.TaskType; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.query.Position; @@ -63,6 +72,7 @@ import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.streams.processor.internals.ProcessorContextImpl.BYTEARRAY_VALUE_SERIALIZER; import static org.apache.kafka.streams.processor.internals.ProcessorContextImpl.BYTES_KEY_SERIALIZER; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThrows; @@ -82,6 +92,10 @@ public class ProcessorContextImplTest { private final StreamsConfig streamsConfig = streamsConfigMock(); + private final Metrics metrics = new Metrics(); + private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test", StreamsConfig.METRICS_LATEST, new MockTime()); + private final String threadId = Thread.currentThread().getName(); + @Mock private RecordCollector recordCollector; @Mock @@ -155,7 +169,7 @@ public void setup() { mock(TaskId.class), streamsConfig, stateManager, - mock(StreamsMetricsImpl.class), + streamsMetrics, mock(ThreadCache.class) ); @@ -187,7 +201,7 @@ private ProcessorContextImpl getStandbyContext() { mock(TaskId.class), streamsConfig, stateManager, - mock(StreamsMetricsImpl.class), + streamsMetrics, mock(ThreadCache.class) ); } @@ -426,7 +440,7 @@ public void shouldSendRecordHeadersToChangelogTopicWhenConsistencyEnabled() { mock(TaskId.class), streamsConfigWithConsistencyMock(), stateManager, - mock(StreamsMetricsImpl.class), + streamsMetrics, mock(ThreadCache.class) ); @@ -597,6 +611,174 @@ public void shouldSetAndGetProcessorMetaData() { assertThrows(NullPointerException.class, () -> context.setProcessorMetadata(null)); } + @Test + public void shouldContinueOnProcessingExceptions() { + when(streamsConfig.processingExceptionHandler()) + .thenReturn(processingExceptionHandlerMock(ProcessingExceptionHandler.ProcessingHandlerResponse.CONTINUE)); + + final TaskId taskId = mock(TaskId.class); + when(taskId.toString()).thenReturn("0_0"); + + final StreamTask task = mock(StreamTask.class); + when(task.rawRecord()).thenReturn(new ConsumerRecord<>("topic", 0, 0, KEY_BYTES.get(), VALUE_BYTES)); + + final Metrics metrics = new Metrics(); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test", StreamsConfig.METRICS_LATEST, new MockTime()); + final ProcessorContextImpl context = new ProcessorContextImpl( + taskId, + streamsConfig, + stateManager, + streamsMetrics, + mock(ThreadCache.class) + ); + + context.transitionToActive(task, null, null); + + final ProcessorNode processorNode = new ProcessorNode<>( + "fake", + (org.apache.kafka.streams.processor.api.Processor) null, + Collections.emptySet() + ); + + final ProcessorNode childProcessorNode = new ProcessorNode<>( + "fakeChild", + (Processor) record -> { + throw new RuntimeException("Exception should be handled by processing exception handler"); + }, + Collections.emptySet() + ); + + processorNode.init(context); + childProcessorNode.init(context); + processorNode.addChild(childProcessorNode); + + context.setCurrentNode(processorNode); + + context.forward("key", "value"); + + final MetricName dropTotal = droppedRecordsTotalMetric(); + final MetricName dropRate = droppedRecordsRateMetric(); + + assertEquals(1.0, metrics.metrics().get(dropTotal).metricValue()); + assertTrue((Double) metrics.metrics().get(dropRate).metricValue() > 0.0); + } + + @Test + public void shouldFailOnProcessingExceptions() { + when(streamsConfig.processingExceptionHandler()) + .thenReturn(processingExceptionHandlerMock(ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL)); + + final TaskId taskId = mock(TaskId.class); + when(taskId.toString()).thenReturn("0_0"); + + final StreamTask task = mock(StreamTask.class); + when(task.rawRecord()).thenReturn(new ConsumerRecord<>("topic", 0, 0, KEY_BYTES.get(), VALUE_BYTES)); + + final Metrics metrics = new Metrics(); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test", StreamsConfig.METRICS_LATEST, new MockTime()); + final ProcessorContextImpl context = new ProcessorContextImpl( + taskId, + streamsConfig, + stateManager, + streamsMetrics, + mock(ThreadCache.class) + ); + + context.transitionToActive(task, null, null); + + final ProcessorNode processorNode = new ProcessorNode<>( + "fake", + (org.apache.kafka.streams.processor.api.Processor) null, + Collections.emptySet() + ); + + final ProcessorNode childProcessorNode = new ProcessorNode<>( + "fakeChild", + (Processor) record -> { + throw new RuntimeException("Exception should be handled by processing exception handler"); + }, + Collections.emptySet() + ); + + processorNode.init(context); + childProcessorNode.init(context); + processorNode.addChild(childProcessorNode); + + context.setCurrentNode(processorNode); + + final StreamsException exception = assertThrows(StreamsException.class, () -> context.forward("key", "value")); + assertEquals("Processing exception handler is set to fail upon a processing error. " + + "If you would rather have the streaming pipeline continue after a processing error, " + + "please set the processing.exception.handler appropriately.", exception.getMessage()); + + final MetricName dropTotal = droppedRecordsTotalMetric(); + final MetricName dropRate = droppedRecordsRateMetric(); + + assertEquals(0.0, metrics.metrics().get(dropTotal).metricValue()); + assertEquals(0.0, metrics.metrics().get(dropRate).metricValue()); + } + + /** + * Processing exception handler mock + * + * @param response the processing handler response + * @return the processing exception handler mock + */ + private ProcessingExceptionHandler processingExceptionHandlerMock(final ProcessingExceptionHandler.ProcessingHandlerResponse response) { + return new ProcessingExceptionHandler() { + @Override + public ProcessingHandlerResponse handle(final ErrorHandlerContext context, final Record record, final Exception exception) { + assertArrayEquals(KEY_BYTES.get(), context.sourceRawKey()); + assertArrayEquals(VALUE_BYTES, context.sourceRawValue()); + assertEquals("fakeChild", context.processorNodeId()); + assertEquals("key", record.key()); + assertEquals("value", record.value()); + assertEquals("Exception should be handled by processing exception handler", exception.getMessage()); + + return response; + } + + @Override + public void configure(final Map configs) { + // No-op + } + }; + } + + /** + * Metric name for dropped records total. + * + * @return the metric name + */ + private MetricName droppedRecordsTotalMetric() { + return new MetricName( + "dropped-records-total", + "stream-task-metrics", + "The total number of dropped records", + mkMap( + mkEntry("thread-id", threadId), + mkEntry("task-id", "0_0") + ) + ); + } + + /** + * Metric name for dropped records rate. + * + * @return the metric name + */ + private MetricName droppedRecordsRateMetric() { + return new MetricName( + "dropped-records-rate", + "stream-task-metrics", + "The average number of dropped records per second", + mkMap( + mkEntry("thread-id", threadId), + mkEntry("task-id", "0_0") + ) + ); + } + @SuppressWarnings("unchecked") private KeyValueStore keyValueStoreMock() { final KeyValueStore keyValueStoreMock = mock(KeyValueStore.class); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java index dfa3f9e422a75..eea4e63c1c777 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java @@ -139,7 +139,7 @@ public void testTopologyLevelClassCastException() { final TestInputTopic topic = testDriver.createInputTopic("streams-plaintext-input", new StringSerializer(), new StringSerializer()); final StreamsException se = assertThrows(StreamsException.class, () -> topic.pipeInput("a-key", "a value")); - final String msg = se.getMessage(); + final String msg = se.getCause().getMessage(); assertTrue("Error about class cast with serdes", msg.contains("ClassCastException")); assertTrue("Error about class cast with serdes", msg.contains("Serdes")); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java index 448ceaf67014a..cd21b6ed32375 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java @@ -24,15 +24,22 @@ import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler; +import org.apache.kafka.streams.errors.LogAndFailExceptionHandler; +import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.test.InternalMockProcessorContext; import org.junit.Test; import java.util.Optional; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertNull; + public class RecordDeserializerTest { - private final RecordHeaders headers = new RecordHeaders(new Header[] {new RecordHeader("key", "value".getBytes())}); + private final RecordHeaders headers = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())}); private final ConsumerRecord rawRecord = new ConsumerRecord<>("topic", 1, 1, @@ -45,6 +52,8 @@ public class RecordDeserializerTest { headers, Optional.empty()); + private final InternalProcessorContext context = new InternalMockProcessorContext<>(); + @Test public void shouldReturnConsumerRecordWithDeserializedValueWhenNoExceptions() { final RecordDeserializer recordDeserializer = new RecordDeserializer( @@ -68,6 +77,70 @@ public void shouldReturnConsumerRecordWithDeserializedValueWhenNoExceptions() { assertEquals(rawRecord.headers(), record.headers()); } + @Test + public void shouldThrowExceptionWithKeyDeserializationAndFail() { + final RecordDeserializer recordDeserializer = new RecordDeserializer( + new TheSourceNode( + true, + false, + "key", "value" + ), + new LogAndFailExceptionHandler(), + new LogContext(), + new Metrics().sensor("dropped-records") + ); + final StreamsException e = assertThrows(StreamsException.class, () -> recordDeserializer.deserialize(context, rawRecord)); + assertEquals(e.getMessage(), "Deserialization exception handler is set to fail upon a deserialization error. If you would rather have the streaming pipeline continue after a deserialization error, please set the default.deserialization.exception.handler appropriately."); + } + + @Test + public void shouldThrowExceptionWithValueDeserializationAndFail() { + final RecordDeserializer recordDeserializer = new RecordDeserializer( + new TheSourceNode( + false, + true, + "key", "value" + ), + new LogAndFailExceptionHandler(), + new LogContext(), + new Metrics().sensor("dropped-records") + ); + final StreamsException e = assertThrows(StreamsException.class, () -> recordDeserializer.deserialize(context, rawRecord)); + assertEquals(e.getMessage(), "Deserialization exception handler is set to fail upon a deserialization error. If you would rather have the streaming pipeline continue after a deserialization error, please set the default.deserialization.exception.handler appropriately."); + } + + @Test + public void shouldThrowExceptionWithKeyDeserializationAndContinue() { + final RecordDeserializer recordDeserializer = new RecordDeserializer( + new TheSourceNode( + true, + false, + "key", "value" + ), + new LogAndContinueExceptionHandler(), + new LogContext(), + new Metrics().sensor("dropped-records") + ); + final ConsumerRecord record = recordDeserializer.deserialize(context, rawRecord); + assertNull(record); + } + + @Test + public void shouldThrowExceptionWithValueDeserializationAndContinue() { + final RecordDeserializer recordDeserializer = new RecordDeserializer( + new TheSourceNode( + false, + true, + "key", "value" + ), + new LogAndContinueExceptionHandler(), + new LogContext(), + new Metrics().sensor("dropped-records") + ); + final ConsumerRecord record = recordDeserializer.deserialize(context, rawRecord); + assertNull(record); + } + static class TheSourceNode extends SourceNode { private final boolean keyThrowsException; private final boolean valueThrowsException;