From 8eb130092bf5b92f1039d986335151bbf08edff6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Greffier?= Date: Mon, 27 May 2024 15:21:38 +0200 Subject: [PATCH] KAFKA-16448: Catch and handle processing exceptions Co-authored-by: Dabz Co-authored-by: sebastienviale KAFKA-16448: Catch and handle processing exceptions Co-authored-by: Dabz Co-authored-by: loicgreffier --- .../apache/kafka/streams/TopologyConfig.java | 8 + .../internals/DefaultErrorHandlerContext.java | 3 + .../internals/FailedProcessingException.java | 31 +++ .../processor/internals/CorruptedRecord.java | 2 +- .../internals/GlobalStateManagerImpl.java | 3 +- .../internals/GlobalStateUpdateTask.java | 3 +- .../processor/internals/ProcessorAdapter.java | 3 +- .../internals/ProcessorContextImpl.java | 3 +- .../processor/internals/ProcessorNode.java | 52 ++++ .../internals/ProcessorRecordContext.java | 18 +- .../processor/internals/RecordQueue.java | 2 +- .../streams/processor/internals/SinkNode.java | 3 +- .../processor/internals/StampedRecord.java | 18 +- .../processor/internals/StreamTask.java | 21 +- ...essingExceptionHandlerIntegrationTest.java | 239 ++++++++++++++++++ .../internals/ProcessorNodeTest.java | 193 +++++++++++++- 16 files changed, 573 insertions(+), 29 deletions(-) create mode 100644 streams/src/main/java/org/apache/kafka/streams/errors/internals/FailedProcessingException.java create mode 100644 streams/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java diff --git a/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java b/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java index f398f1500740c..f53f7f72bc791 100644 --- a/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.errors.DeserializationExceptionHandler; +import org.apache.kafka.streams.errors.ProcessingExceptionHandler; import org.apache.kafka.streams.internals.StreamsConfigUtils; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.internals.MaterializedInternal; @@ -53,6 +54,7 @@ import static org.apache.kafka.streams.StreamsConfig.IN_MEMORY; import static org.apache.kafka.streams.StreamsConfig.MAX_TASK_IDLE_MS_CONFIG; import static org.apache.kafka.streams.StreamsConfig.MAX_TASK_IDLE_MS_DOC; +import static org.apache.kafka.streams.StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG; import static org.apache.kafka.streams.StreamsConfig.ROCKS_DB; import static org.apache.kafka.streams.StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG; import static org.apache.kafka.streams.StreamsConfig.STATESTORE_CACHE_MAX_BYTES_DOC; @@ -136,6 +138,7 @@ public class TopologyConfig extends AbstractConfig { public final Class dslStoreSuppliers; public final Supplier timestampExtractorSupplier; public final Supplier deserializationExceptionHandlerSupplier; + public final Supplier processingExceptionHandlerSupplier; public TopologyConfig(final StreamsConfig globalAppConfigs) { this(null, globalAppConfigs, new Properties()); @@ -151,6 +154,7 @@ public TopologyConfig(final String topologyName, final StreamsConfig globalAppCo this.applicationConfigs = globalAppConfigs; this.topologyOverrides = topologyOverrides; + this.processingExceptionHandlerSupplier = () -> globalAppConfigs.getConfiguredInstance(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, ProcessingExceptionHandler.class); if (isTopologyOverride(BUFFERED_RECORDS_PER_PARTITION_CONFIG, topologyOverrides)) { maxBufferedSize = getInt(BUFFERED_RECORDS_PER_PARTITION_CONFIG); @@ -281,6 +285,7 @@ public TaskConfig getTaskConfig() { maxBufferedSize, timestampExtractorSupplier.get(), deserializationExceptionHandlerSupplier.get(), + processingExceptionHandlerSupplier.get(), eosEnabled ); } @@ -291,6 +296,7 @@ public static class TaskConfig { public final int maxBufferedSize; public final TimestampExtractor timestampExtractor; public final DeserializationExceptionHandler deserializationExceptionHandler; + public final ProcessingExceptionHandler processingExceptionHandler; public final boolean eosEnabled; private TaskConfig(final long maxTaskIdleMs, @@ -298,12 +304,14 @@ private TaskConfig(final long maxTaskIdleMs, final int maxBufferedSize, final TimestampExtractor timestampExtractor, final DeserializationExceptionHandler deserializationExceptionHandler, + final ProcessingExceptionHandler processingExceptionHandler, final boolean eosEnabled) { this.maxTaskIdleMs = maxTaskIdleMs; this.taskTimeoutMs = taskTimeoutMs; this.maxBufferedSize = maxBufferedSize; this.timestampExtractor = timestampExtractor; this.deserializationExceptionHandler = deserializationExceptionHandler; + this.processingExceptionHandler = processingExceptionHandler; this.eosEnabled = eosEnabled; } } diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/internals/DefaultErrorHandlerContext.java b/streams/src/main/java/org/apache/kafka/streams/errors/internals/DefaultErrorHandlerContext.java index 4723f247db603..fc6b6048cb902 100644 --- a/streams/src/main/java/org/apache/kafka/streams/errors/internals/DefaultErrorHandlerContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/errors/internals/DefaultErrorHandlerContext.java @@ -20,6 +20,9 @@ import org.apache.kafka.streams.errors.ErrorHandlerContext; import org.apache.kafka.streams.processor.TaskId; +/** + * Default implementation of {@link ErrorHandlerContext} that provides access to the metadata of the record that caused the error. + */ public class DefaultErrorHandlerContext implements ErrorHandlerContext { private final String topic; private final int partition; diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/internals/FailedProcessingException.java b/streams/src/main/java/org/apache/kafka/streams/errors/internals/FailedProcessingException.java new file mode 100644 index 0000000000000..81b2a2d4fb1fb --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/errors/internals/FailedProcessingException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.errors.internals; + +import org.apache.kafka.common.KafkaException; + +/** + * {@link FailedProcessingException} is the top-level exception type generated by Kafka Streams, and indicates errors have + * occurred during a {@link org.apache.kafka.streams.processor.internals.ProcessorNode ProcessorNode's} processing. + */ +public class FailedProcessingException extends KafkaException { + private static final long serialVersionUID = 1L; + + public FailedProcessingException(final Throwable throwable) { + super(throwable); + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/CorruptedRecord.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/CorruptedRecord.java index d31a29883cabe..1bc8cb51092c6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/CorruptedRecord.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/CorruptedRecord.java @@ -28,7 +28,7 @@ public class CorruptedRecord extends StampedRecord { CorruptedRecord(final ConsumerRecord rawRecord) { - super(rawRecord, ConsumerRecord.NO_TIMESTAMP); + super(rawRecord, ConsumerRecord.NO_TIMESTAMP, rawRecord); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java index 6b7214a9ed185..b1263ddc58df6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java @@ -307,7 +307,8 @@ private void reprocessState(final List topicPartitions, record.offset(), record.partition(), record.topic(), - record.headers()); + record.headers(), + record); globalProcessorContext.setRecordContext(recordContext); try { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java index 12a6beedbcd98..1713efb52a9bd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java @@ -113,7 +113,8 @@ public void update(final ConsumerRecord record) { deserialized.offset(), deserialized.partition(), deserialized.topic(), - deserialized.headers()); + deserialized.headers(), + record); processorContext.setRecordContext(recordContext); processorContext.setCurrentNode(sourceNodeAndDeserializer.sourceNode()); final Record toProcess = new Record<>( diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java index 79db3847cfb06..a5d88f5a7f8a9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java @@ -66,7 +66,8 @@ public void process(final Record record) { context.offset(), context.partition(), context.topic(), - record.headers() + record.headers(), + processorRecordContext.rawRecord() )); delegate.process(record.key(), record.value()); } finally { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java index b484d26f0fe87..6a79434b622ab 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java @@ -261,7 +261,8 @@ public void forward(final Record record, final String childName) { recordContext.offset(), recordContext.partition(), recordContext.topic(), - record.headers()); + record.headers(), + recordContext.rawRecord()); } if (childName == null) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java index fbd976cb11274..eabf9e3d5c471 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java @@ -16,13 +16,24 @@ */ package org.apache.kafka.streams.processor.internals; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.streams.errors.ErrorHandlerContext; +import org.apache.kafka.streams.errors.ProcessingExceptionHandler; import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.errors.TaskCorruptedException; +import org.apache.kafka.streams.errors.TaskMigratedException; +import org.apache.kafka.streams.errors.internals.DefaultErrorHandlerContext; +import org.apache.kafka.streams.errors.internals.FailedProcessingException; import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.processor.api.FixedKeyProcessor; import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext; import org.apache.kafka.streams.processor.api.InternalFixedKeyRecordFactory; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.HashMap; @@ -30,8 +41,11 @@ import java.util.Map; import java.util.Set; +import static org.apache.kafka.streams.StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG; + public class ProcessorNode { + private final Logger log = LoggerFactory.getLogger(ProcessorNode.class); private final List> children; private final Map> childByName; @@ -40,12 +54,15 @@ public class ProcessorNode { private final String name; public final Set stateStores; + private ProcessingExceptionHandler processingExceptionHandler; private InternalProcessorContext internalProcessorContext; private String threadId; private boolean closed = true; + private Sensor droppedRecordsSensor; + public ProcessorNode(final String name) { this(name, (Processor) null, null); } @@ -98,6 +115,10 @@ public void init(final InternalProcessorContext context) { try { threadId = Thread.currentThread().getName(); internalProcessorContext = context; + droppedRecordsSensor = TaskMetrics.droppedRecordsSensor(threadId, + internalProcessorContext.taskId().toString(), + internalProcessorContext.metrics()); + if (processor != null) { processor.init(context); } @@ -115,6 +136,11 @@ public void init(final InternalProcessorContext context) { closed = false; } + public void init(final InternalProcessorContext context, final ProcessingExceptionHandler processingExceptionHandler) { + init(context); + this.processingExceptionHandler = processingExceptionHandler; + } + public void close() { throwIfClosed(); @@ -174,6 +200,32 @@ public void process(final Record record) { keyClass, valueClass), e); + } catch (final FailedProcessingException | TaskCorruptedException | TaskMigratedException e) { + // Rethrow exceptions that should not be handled here + throw e; + } catch (final Exception e) { + final ErrorHandlerContext errorHandlerContext = new DefaultErrorHandlerContext( + internalProcessorContext.topic(), + internalProcessorContext.partition(), + internalProcessorContext.offset(), + internalProcessorContext.headers(), + internalProcessorContext.recordContext().rawRecord().key(), + internalProcessorContext.recordContext().rawRecord().value(), + internalProcessorContext.currentNode().name(), + internalProcessorContext.taskId()); + + final ProcessingExceptionHandler.ProcessingHandlerResponse response = processingExceptionHandler + .handle(errorHandlerContext, record, e); + + if (response == ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL) { + log.error("Processing exception handler is set to fail upon" + + " a processing error. If you would rather have the streaming pipeline" + + " continue after a processing error, please set the " + + PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG + " appropriately."); + throw new FailedProcessingException(e); + } else { + droppedRecordsSensor.record(); + } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java index 839baaad87528..3d1ce0529e678 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.processor.internals; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeader; @@ -37,17 +38,28 @@ public class ProcessorRecordContext implements RecordContext, RecordMetadata { private final String topic; private final int partition; private final Headers headers; + private final ConsumerRecord rawRecord; public ProcessorRecordContext(final long timestamp, final long offset, final int partition, final String topic, final Headers headers) { + this(timestamp, offset, partition, topic, headers, null); + } + + public ProcessorRecordContext(final long timestamp, + final long offset, + final int partition, + final String topic, + final Headers headers, + final ConsumerRecord rawRecord) { this.timestamp = timestamp; this.offset = offset; this.topic = topic; this.partition = partition; this.headers = Objects.requireNonNull(headers); + this.rawRecord = rawRecord; } @Override @@ -75,6 +87,10 @@ public Headers headers() { return headers; } + public ConsumerRecord rawRecord() { + return rawRecord; + } + public long residentMemorySizeEstimate() { long size = 0; size += Long.BYTES; // value.context.timestamp @@ -173,7 +189,7 @@ public static ProcessorRecordContext deserialize(final ByteBuffer buffer) { headers = new RecordHeaders(headerArr); } - return new ProcessorRecordContext(timestamp, offset, partition, topic, headers); + return new ProcessorRecordContext(timestamp, offset, partition, topic, headers, null); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java index a6b30a07ef96d..a3c9ea67f067d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java @@ -230,7 +230,7 @@ private void updateHead() { droppedRecordsSensor.record(); continue; } - headRecord = new StampedRecord(deserialized, timestamp); + headRecord = new StampedRecord(deserialized, timestamp, raw); headRecordSizeInBytes = consumerRecordSizeInBytes(raw); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java index 6e79616d30a9c..871cb2284ee4d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java @@ -88,7 +88,8 @@ public void process(final Record record) { context.offset(), context.partition(), context.topic(), - record.headers() + record.headers(), + context.recordContext().rawRecord() ); final String topic = topicExtractor.extract(key, value, contextForExtraction); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StampedRecord.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StampedRecord.java index 71e3ca2e3ceca..d82cd98ed7eec 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StampedRecord.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StampedRecord.java @@ -20,9 +20,11 @@ import org.apache.kafka.common.header.Headers; public class StampedRecord extends Stamped> { + private final ConsumerRecord rawRecord; - public StampedRecord(final ConsumerRecord record, final long timestamp) { + public StampedRecord(final ConsumerRecord record, final long timestamp, final ConsumerRecord rawRecord) { super(record, timestamp); + this.rawRecord = rawRecord; } public String topic() { @@ -49,6 +51,20 @@ public Headers headers() { return value.headers(); } + public ConsumerRecord rawRecord() { + return rawRecord; + } + + @Override + public boolean equals(final Object other) { + return super.equals(other); + } + + @Override + public int hashCode() { + return super.hashCode(); + } + @Override public String toString() { return value.toString() + ", timestamp = " + timestamp; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index f6ff7f35ad7d5..52d40a34915c7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -30,10 +30,12 @@ import org.apache.kafka.streams.TopologyConfig.TaskConfig; import org.apache.kafka.streams.errors.DeserializationExceptionHandler; import org.apache.kafka.streams.errors.LockException; +import org.apache.kafka.streams.errors.ProcessingExceptionHandler; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.TaskCorruptedException; import org.apache.kafka.streams.errors.TaskMigratedException; import org.apache.kafka.streams.errors.TopologyException; +import org.apache.kafka.streams.errors.internals.FailedProcessingException; import org.apache.kafka.streams.processor.Cancellable; import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.processor.Punctuator; @@ -105,6 +107,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, @SuppressWarnings("rawtypes") protected final InternalProcessorContext processorContext; + private final ProcessingExceptionHandler processingExceptionHandler; private StampedRecord record; private boolean commitNeeded = false; @@ -217,6 +220,7 @@ public StreamTask(final TaskId id, highWatermark.put(topicPartition, -1L); } timeCurrentIdlingStarted = Optional.empty(); + processingExceptionHandler = config.processingExceptionHandler; } // create queues for each assigned partition and associate them @@ -800,6 +804,9 @@ record = null; record = null; throw exception; } catch (final RuntimeException e) { + // Do not keep the failed processing exception in the stack trace + final Throwable processingException = e instanceof FailedProcessingException ? e.getCause() : e; + final StreamsException error = new StreamsException( String.format( "Exception caught in process. taskId=%s, processor=%s, topic=%s, partition=%d, offset=%d, stacktrace=%s", @@ -808,9 +815,9 @@ record = null; record.topic(), record.partition(), record.offset(), - getStacktraceString(e) + getStacktraceString(processingException) ), - e + processingException ); record = null; @@ -833,7 +840,8 @@ private void doProcess(final long wallClockTime) { record.offset(), record.partition(), record.topic(), - record.headers() + record.headers(), + record.rawRecord() ); updateProcessorContext(currNode, wallClockTime, recordContext); @@ -861,7 +869,7 @@ public void recordProcessTimeRatioAndBufferSize(final long allTaskProcessMs, fin processTimeMs = 0L; } - private String getStacktraceString(final RuntimeException e) { + private String getStacktraceString(final Throwable e) { String stacktrace = null; try (final StringWriter stringWriter = new StringWriter(); final PrintWriter printWriter = new PrintWriter(stringWriter)) { @@ -894,7 +902,8 @@ public void punctuate(final ProcessorNode node, -1L, -1, null, - new RecordHeaders() + new RecordHeaders(), + null ); updateProcessorContext(node, time.milliseconds(), recordContext); @@ -1020,7 +1029,7 @@ private void initializeTopology() { for (final ProcessorNode node : topology.processors()) { processorContext.setCurrentNode(node); try { - node.init(processorContext); + node.init(processorContext, processingExceptionHandler); } finally { processorContext.setCurrentNode(null); } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java new file mode 100644 index 0000000000000..bb29ac64f02e0 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.KeyValueTimestamp; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.errors.ErrorHandlerContext; +import org.apache.kafka.streams.errors.ProcessingExceptionHandler; +import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.processor.api.ContextualProcessor; +import org.apache.kafka.streams.processor.api.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.test.MockProcessorSupplier; + +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.time.Duration; +import java.time.Instant; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertIterableEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@Timeout(600) +@Tag("integration") +public class ProcessingExceptionHandlerIntegrationTest { + private final String threadId = Thread.currentThread().getName(); + + @Test + public void shouldFailWhenProcessingExceptionOccurs() { + final List> events = Arrays.asList( + new KeyValue<>("ID123-1", "ID123-A1"), + new KeyValue<>("ID123-2-ERR", "ID123-A2"), + new KeyValue<>("ID123-3", "ID123-A3"), + new KeyValue<>("ID123-4", "ID123-A4") + ); + + final List> expectedProcessedRecords = Collections.singletonList( + new KeyValueTimestamp<>("ID123-1", "ID123-A1", 0) + ); + + final MockProcessorSupplier processor = new MockProcessorSupplier<>(); + final StreamsBuilder builder = new StreamsBuilder(); + builder + .stream("TOPIC_NAME", Consumed.with(Serdes.String(), Serdes.String())) + .map(KeyValue::new) + .mapValues(value -> value) + .process(runtimeErrorProcessorSupplierMock()) + .process(processor); + + final Properties properties = new Properties(); + properties.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, FailProcessingExceptionHandlerMockTest.class); + + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), properties, Instant.ofEpochMilli(0L))) { + final TestInputTopic inputTopic = driver.createInputTopic("TOPIC_NAME", new StringSerializer(), new StringSerializer()); + + final StreamsException exception = assertThrows(StreamsException.class, + () -> inputTopic.pipeKeyValueList(events, Instant.EPOCH, Duration.ZERO)); + + assertTrue(exception.getMessage().contains("Exception caught in process. " + + "taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=TOPIC_NAME, " + + "partition=0, offset=1, stacktrace=java.lang.RuntimeException: " + + "Exception should be handled by processing exception handler")); + assertEquals(1, processor.theCapturedProcessor().processed().size()); + assertIterableEquals(expectedProcessedRecords, processor.theCapturedProcessor().processed()); + + final MetricName dropTotal = droppedRecordsTotalMetric(); + final MetricName dropRate = droppedRecordsRateMetric(); + + assertEquals(0.0, driver.metrics().get(dropTotal).metricValue()); + assertEquals(0.0, driver.metrics().get(dropRate).metricValue()); + } + } + + @Test + public void shouldContinueWhenProcessingExceptionOccurs() { + final List> events = Arrays.asList( + new KeyValue<>("ID123-1", "ID123-A1"), + new KeyValue<>("ID123-2-ERR", "ID123-A2"), + new KeyValue<>("ID123-3", "ID123-A3"), + new KeyValue<>("ID123-4", "ID123-A4"), + new KeyValue<>("ID123-5-ERR", "ID123-A5"), + new KeyValue<>("ID123-6", "ID123-A6") + ); + + final List> expectedProcessedRecords = Arrays.asList( + new KeyValueTimestamp<>("ID123-1", "ID123-A1", 0), + new KeyValueTimestamp<>("ID123-3", "ID123-A3", 0), + new KeyValueTimestamp<>("ID123-4", "ID123-A4", 0), + new KeyValueTimestamp<>("ID123-6", "ID123-A6", 0) + ); + + final MockProcessorSupplier processor = new MockProcessorSupplier<>(); + final StreamsBuilder builder = new StreamsBuilder(); + builder + .stream("TOPIC_NAME", Consumed.with(Serdes.String(), Serdes.String())) + .map(KeyValue::new) + .mapValues(value -> value) + .process(runtimeErrorProcessorSupplierMock()) + .process(processor); + + final Properties properties = new Properties(); + properties.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, ContinueProcessingExceptionHandlerMockTest.class); + + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), properties, Instant.ofEpochMilli(0L))) { + final TestInputTopic inputTopic = driver.createInputTopic("TOPIC_NAME", new StringSerializer(), new StringSerializer()); + inputTopic.pipeKeyValueList(events, Instant.EPOCH, Duration.ZERO); + + assertEquals(expectedProcessedRecords.size(), processor.theCapturedProcessor().processed().size()); + assertIterableEquals(expectedProcessedRecords, processor.theCapturedProcessor().processed()); + + final MetricName dropTotal = droppedRecordsTotalMetric(); + final MetricName dropRate = droppedRecordsRateMetric(); + + assertEquals(2.0, driver.metrics().get(dropTotal).metricValue()); + assertTrue((Double) driver.metrics().get(dropRate).metricValue() > 0.0); + } + } + + public static class ContinueProcessingExceptionHandlerMockTest implements ProcessingExceptionHandler { + @Override + public ProcessingExceptionHandler.ProcessingHandlerResponse handle(final ErrorHandlerContext context, final Record record, final Exception exception) { + assertProcessingExceptionHandlerInputs(context, record, exception); + return ProcessingExceptionHandler.ProcessingHandlerResponse.CONTINUE; + } + + @Override + public void configure(final Map configs) { + // No-op + } + } + + public static class FailProcessingExceptionHandlerMockTest implements ProcessingExceptionHandler { + @Override + public ProcessingExceptionHandler.ProcessingHandlerResponse handle(final ErrorHandlerContext context, final Record record, final Exception exception) { + assertProcessingExceptionHandlerInputs(context, record, exception); + return ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL; + } + + @Override + public void configure(final Map configs) { + // No-op + } + } + + private static void assertProcessingExceptionHandlerInputs(final ErrorHandlerContext context, final Record record, final Exception exception) { + assertTrue(Arrays.asList("ID123-2-ERR", "ID123-5-ERR").contains(new String(context.sourceRawKey()))); + assertTrue(Arrays.asList("ID123-A2", "ID123-A5").contains(new String(context.sourceRawValue()))); + assertTrue(Arrays.asList("ID123-2-ERR", "ID123-5-ERR").contains((String) record.key())); + assertTrue(Arrays.asList("ID123-A2", "ID123-A5").contains((String) record.value())); + assertEquals("TOPIC_NAME", context.topic()); + assertEquals("KSTREAM-PROCESSOR-0000000003", context.processorNodeId()); + assertTrue(exception.getMessage().contains("Exception should be handled by processing exception handler")); + } + + /** + * Metric name for dropped records total. + * + * @return the metric name + */ + private MetricName droppedRecordsTotalMetric() { + return new MetricName( + "dropped-records-total", + "stream-task-metrics", + "The total number of dropped records", + mkMap( + mkEntry("thread-id", threadId), + mkEntry("task-id", "0_0") + ) + ); + } + + /** + * Metric name for dropped records rate. + * + * @return the metric name + */ + private MetricName droppedRecordsRateMetric() { + return new MetricName( + "dropped-records-rate", + "stream-task-metrics", + "The average number of dropped records per second", + mkMap( + mkEntry("thread-id", threadId), + mkEntry("task-id", "0_0") + ) + ); + } + + /** + * Processor supplier that throws a runtime exception on process. + * + * @return the processor supplier + */ + private ProcessorSupplier runtimeErrorProcessorSupplierMock() { + return () -> new ContextualProcessor() { + @Override + public void process(final Record record) { + if (record.key().contains("ERR")) { + throw new RuntimeException("Exception should be handled by processing exception handler"); + } + + context().forward(new Record<>(record.key(), record.value(), record.timestamp())); + } + }; + } +} \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java index 2f38301dae732..6be033cb2dc08 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java @@ -16,6 +16,10 @@ */ package org.apache.kafka.streams.processor.internals; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.InvalidOffsetException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringSerializer; @@ -25,7 +29,13 @@ import org.apache.kafka.streams.TestInputTopic; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.errors.ErrorHandlerContext; +import org.apache.kafka.streams.errors.ProcessingExceptionHandler; import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.errors.TaskCorruptedException; +import org.apache.kafka.streams.errors.TaskMigratedException; +import org.apache.kafka.streams.errors.internals.FailedProcessingException; +import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.Record; @@ -34,36 +44,110 @@ import org.apache.kafka.test.StreamsTestUtils; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.quality.Strictness; import java.util.Collections; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.Map; import java.util.Properties; +import java.util.Set; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ROLLUP_VALUE; -import static org.hamcrest.CoreMatchers.containsString; -import static org.hamcrest.CoreMatchers.instanceOf; -import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.withSettings; +@ExtendWith(MockitoExtension.class) public class ProcessorNodeTest { + private static final String TOPIC = "topic"; + private static final int PARTITION = 0; + private static final Long OFFSET = 0L; + private static final Long TIMESTAMP = 0L; + private static final TaskId TASK_ID = new TaskId(0, 0); + private static final String NAME = "name"; + private static final String KEY = "key"; + private static final String VALUE = "value"; @Test public void shouldThrowStreamsExceptionIfExceptionCaughtDuringInit() { final ProcessorNode node = - new ProcessorNode<>("name", new ExceptionalProcessor(), Collections.emptySet()); + new ProcessorNode<>(NAME, new ExceptionalProcessor(), Collections.emptySet()); assertThrows(StreamsException.class, () -> node.init(null)); } @Test public void shouldThrowStreamsExceptionIfExceptionCaughtDuringClose() { final ProcessorNode node = - new ProcessorNode<>("name", new ExceptionalProcessor(), Collections.emptySet()); + new ProcessorNode<>(NAME, new ExceptionalProcessor(), Collections.emptySet()); assertThrows(StreamsException.class, () -> node.init(null)); } + @Test + public void shouldThrowFailedProcessingExceptionWhenProcessingExceptionHandlerRepliesWithFail() { + final ProcessorNode node = + new ProcessorNode<>(NAME, new IgnoredInternalExceptionsProcessor(), Collections.emptySet()); + + final InternalProcessorContext internalProcessorContext = mockInternalProcessorContext(); + node.init(internalProcessorContext, new ProcessingExceptionHandlerMock(ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL, internalProcessorContext)); + + final FailedProcessingException failedProcessingException = assertThrows(FailedProcessingException.class, + () -> node.process(new Record<>(KEY, VALUE, TIMESTAMP))); + + assertTrue(failedProcessingException.getCause() instanceof RuntimeException); + assertEquals("Processing exception should be caught and handled by the processing exception handler.", + failedProcessingException.getCause().getMessage()); + } + + @Test + public void shouldNotThrowFailedProcessingExceptionWhenProcessingExceptionHandlerRepliesWithContinue() { + final ProcessorNode node = + new ProcessorNode<>(NAME, new IgnoredInternalExceptionsProcessor(), Collections.emptySet()); + + final InternalProcessorContext internalProcessorContext = mockInternalProcessorContext(); + node.init(internalProcessorContext, new ProcessingExceptionHandlerMock(ProcessingExceptionHandler.ProcessingHandlerResponse.CONTINUE, internalProcessorContext)); + + assertDoesNotThrow(() -> node.process(new Record<>(KEY, VALUE, TIMESTAMP))); + } + + @ParameterizedTest + @CsvSource({ + "FailedProcessingException,java.lang.RuntimeException,Fail processing", + "TaskCorruptedException,org.apache.kafka.streams.processor.internals.ProcessorNodeTest$IgnoredInternalExceptionsProcessor$1,Invalid offset", + "TaskMigratedException,java.lang.RuntimeException,Task migrated cause" + }) + public void shouldNotHandleInternalExceptionsThrownDuringProcessing(final String ignoredExceptionName, + final Class ignoredExceptionCause, + final String ignoredExceptionCauseMessage) { + final ProcessingExceptionHandler processingExceptionHandler = mock(ProcessingExceptionHandler.class); + + final ProcessorNode node = + new ProcessorNode<>(NAME, new IgnoredInternalExceptionsProcessor(), Collections.emptySet()); + + final InternalProcessorContext internalProcessorContext = mockInternalProcessorContext(); + node.init(internalProcessorContext, processingExceptionHandler); + + final RuntimeException runtimeException = assertThrows(RuntimeException.class, + () -> node.process(new Record<>(ignoredExceptionName, VALUE, TIMESTAMP))); + + assertEquals(ignoredExceptionCause, runtimeException.getCause().getClass()); + assertEquals(ignoredExceptionCauseMessage, runtimeException.getCause().getMessage()); + verify(processingExceptionHandler, never()).handle(any(), any(), any()); + } + private static class ExceptionalProcessor implements Processor { @Override public void init(final ProcessorContext context) { @@ -87,6 +171,32 @@ public void process(final Record record) { } } + private static class IgnoredInternalExceptionsProcessor implements Processor { + @Override + public void process(final Record record) { + if (record.key().equals("FailedProcessingException")) { + throw new FailedProcessingException(new RuntimeException("Fail processing")); + } + + if (record.key().equals("TaskCorruptedException")) { + final Set tasksIds = new HashSet<>(); + tasksIds.add(new TaskId(0, 0)); + throw new TaskCorruptedException(tasksIds, new InvalidOffsetException("Invalid offset") { + @Override + public Set partitions() { + return new HashSet<>(Collections.singletonList(new TopicPartition("topic", 0))); + } + }); + } + + if (record.key().equals("TaskMigratedException")) { + throw new TaskMigratedException("TaskMigratedException", new RuntimeException("Task migrated cause")); + } + + throw new RuntimeException("Processing exception should be caught and handled by the processing exception handler."); + } + } + @Test public void testMetricsWithBuiltInMetricsVersionLatest() { final Metrics metrics = new Metrics(); @@ -94,7 +204,7 @@ public void testMetricsWithBuiltInMetricsVersionLatest() { new StreamsMetricsImpl(metrics, "test-client", StreamsConfig.METRICS_LATEST, new MockTime()); final InternalMockProcessorContext context = new InternalMockProcessorContext<>(streamsMetrics); final ProcessorNode node = - new ProcessorNode<>("name", new NoOpProcessor(), Collections.emptySet()); + new ProcessorNode<>(NAME, new NoOpProcessor(), Collections.emptySet()); node.init(context); final String threadId = Thread.currentThread().getName(); @@ -138,7 +248,7 @@ public void testTopologyLevelClassCastException() { try (final TopologyTestDriver testDriver = new TopologyTestDriver(topology, config)) { final TestInputTopic topic = testDriver.createInputTopic("streams-plaintext-input", new StringSerializer(), new StringSerializer()); - final StreamsException se = assertThrows(StreamsException.class, () -> topic.pipeInput("a-key", "a value")); + final StreamsException se = assertThrows(StreamsException.class, () -> topic.pipeInput(KEY, VALUE)); final String msg = se.getMessage(); assertTrue(msg.contains("ClassCastException"), "Error about class cast with serdes"); assertTrue(msg.contains("Serdes"), "Error about class cast with serdes"); @@ -155,8 +265,8 @@ public void testTopologyLevelConfigException() { final Topology topology = builder.build(); final StreamsException se = assertThrows(StreamsException.class, () -> new TopologyTestDriver(topology)); - assertThat(se.getMessage(), containsString("Failed to initialize key serdes for source node")); - assertThat(se.getCause().getMessage(), containsString("Please specify a key serde or set one through StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG")); + assertTrue(se.getMessage().contains("Failed to initialize key serdes for source node")); + assertTrue(se.getCause().getMessage().contains("Please specify a key serde or set one through StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG")); } private static class ClassCastProcessor extends ExceptionalProcessor { @@ -182,11 +292,66 @@ public void testTopologyLevelClassCastExceptionDirect() { node.init(context); final StreamsException se = assertThrows( StreamsException.class, - () -> node.process(new Record<>("aKey", "aValue", 0)) + () -> node.process(new Record<>(KEY, VALUE, TIMESTAMP)) ); - assertThat(se.getCause(), instanceOf(ClassCastException.class)); - assertThat(se.getMessage(), containsString("default Serdes")); - assertThat(se.getMessage(), containsString("input types")); - assertThat(se.getMessage(), containsString("pname")); + assertTrue(se.getCause() instanceof ClassCastException); + assertTrue(se.getMessage().contains("default Serdes")); + assertTrue(se.getMessage().contains("input types")); + assertTrue(se.getMessage().contains("pname")); + } + + @SuppressWarnings("unchecked") + private InternalProcessorContext mockInternalProcessorContext() { + final InternalProcessorContext internalProcessorContext = mock(InternalProcessorContext.class, withSettings().strictness(Strictness.LENIENT)); + + when(internalProcessorContext.taskId()).thenReturn(TASK_ID); + when(internalProcessorContext.metrics()).thenReturn(new StreamsMetricsImpl(new Metrics(), "test-client", StreamsConfig.METRICS_LATEST, new MockTime())); + when(internalProcessorContext.topic()).thenReturn(TOPIC); + when(internalProcessorContext.partition()).thenReturn(PARTITION); + when(internalProcessorContext.offset()).thenReturn(OFFSET); + when(internalProcessorContext.recordContext()).thenReturn( + new ProcessorRecordContext( + TIMESTAMP, + OFFSET, + PARTITION, + TOPIC, + new RecordHeaders(), + new ConsumerRecord<>(TOPIC, PARTITION, OFFSET, KEY.getBytes(), VALUE.getBytes()))); + when(internalProcessorContext.currentNode()).thenReturn(new ProcessorNode<>(NAME)); + + return internalProcessorContext; + } + + public static class ProcessingExceptionHandlerMock implements ProcessingExceptionHandler { + private final ProcessingExceptionHandler.ProcessingHandlerResponse response; + private final InternalProcessorContext internalProcessorContext; + + public ProcessingExceptionHandlerMock(final ProcessingExceptionHandler.ProcessingHandlerResponse response, + final InternalProcessorContext internalProcessorContext) { + this.response = response; + this.internalProcessorContext = internalProcessorContext; + } + + @Override + public ProcessingExceptionHandler.ProcessingHandlerResponse handle(final ErrorHandlerContext context, final Record record, final Exception exception) { + assertEquals(internalProcessorContext.topic(), context.topic()); + assertEquals(internalProcessorContext.partition(), context.partition()); + assertEquals(internalProcessorContext.offset(), context.offset()); + assertArrayEquals(internalProcessorContext.recordContext().rawRecord().key(), context.sourceRawKey()); + assertArrayEquals(internalProcessorContext.recordContext().rawRecord().value(), context.sourceRawValue()); + assertEquals(internalProcessorContext.currentNode().name(), context.processorNodeId()); + assertEquals(internalProcessorContext.taskId(), context.taskId()); + assertEquals(KEY, record.key()); + assertEquals(VALUE, record.value()); + assertTrue(exception instanceof RuntimeException); + assertEquals("Processing exception should be caught and handled by the processing exception handler.", exception.getMessage()); + + return response; + } + + @Override + public void configure(final Map configs) { + // No-op + } } }