diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java index 763edc9a04500..175c9e104efbc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java @@ -213,9 +213,13 @@ public void process(final Record record) { internalProcessorContext.currentNode().name(), internalProcessorContext.taskId()); - final ProcessingExceptionHandler.ProcessingHandlerResponse response = processingExceptionHandler - .handle(errorHandlerContext, record, e); + final ProcessingExceptionHandler.ProcessingHandlerResponse response; + try { + response = processingExceptionHandler.handle(errorHandlerContext, record, e); + } catch (final Exception fatalUserException) { + throw new FailedProcessingException(fatalUserException); + } if (response == ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL) { log.error("Processing exception handler is set to fail upon" + " a processing error. If you would rather have the streaming pipeline" + diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java index 6c1a64344ec76..61b5ed16bb1ef 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java @@ -45,10 +45,12 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertIterableEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -149,9 +151,150 @@ public void shouldContinueWhenProcessingExceptionOccurs() { } } + @Test + public void shouldStopOnFailedProcessorWhenProcessingExceptionOccursInFailProcessingExceptionHandler() { + final KeyValue event = new KeyValue<>("ID123-1", "ID123-A1"); + final KeyValue eventError = new KeyValue<>("ID123-2-ERR", "ID123-A2"); + + final MockProcessorSupplier processor = new MockProcessorSupplier<>(); + final StreamsBuilder builder = new StreamsBuilder(); + final AtomicBoolean isExecuted = new AtomicBoolean(false); + builder + .stream("TOPIC_NAME", Consumed.with(Serdes.String(), Serdes.String())) + .map(KeyValue::new) + .mapValues(value -> value) + .process(runtimeErrorProcessorSupplierMock()) + .map((k, v) -> { + isExecuted.set(true); + return KeyValue.pair(k, v); + }) + .process(processor); + + final Properties properties = new Properties(); + properties.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, FailProcessingExceptionHandlerMockTest.class); + + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), properties, Instant.ofEpochMilli(0L))) { + final TestInputTopic inputTopic = driver.createInputTopic("TOPIC_NAME", new StringSerializer(), new StringSerializer()); + isExecuted.set(false); + inputTopic.pipeInput(event.key, event.value, Instant.EPOCH); + assertTrue(isExecuted.get()); + isExecuted.set(false); + final StreamsException e = assertThrows(StreamsException.class, () -> inputTopic.pipeInput(eventError.key, eventError.value, Instant.EPOCH)); + assertTrue(e.getMessage().contains("Exception caught in process. " + + "taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=TOPIC_NAME, " + + "partition=0, offset=1, stacktrace=java.lang.RuntimeException: " + + "Exception should be handled by processing exception handler")); + assertFalse(isExecuted.get()); + } + } + + @Test + public void shouldStopOnFailedProcessorWhenProcessingExceptionOccursInContinueProcessingExceptionHandler() { + final KeyValue event = new KeyValue<>("ID123-1", "ID123-A1"); + final KeyValue eventFalse = new KeyValue<>("ID123-2-ERR", "ID123-A2"); + + final MockProcessorSupplier processor = new MockProcessorSupplier<>(); + final StreamsBuilder builder = new StreamsBuilder(); + final AtomicBoolean isExecuted = new AtomicBoolean(false); + builder + .stream("TOPIC_NAME", Consumed.with(Serdes.String(), Serdes.String())) + .map(KeyValue::new) + .mapValues(value -> value) + .process(runtimeErrorProcessorSupplierMock()) + .map((k, v) -> { + isExecuted.set(true); + return KeyValue.pair(k, v); + }) + .process(processor); + + final Properties properties = new Properties(); + properties.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, ContinueProcessingExceptionHandlerMockTest.class); + + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), properties, Instant.ofEpochMilli(0L))) { + final TestInputTopic inputTopic = driver.createInputTopic("TOPIC_NAME", new StringSerializer(), new StringSerializer()); + isExecuted.set(false); + inputTopic.pipeInput(event.key, event.value, Instant.EPOCH); + assertTrue(isExecuted.get()); + isExecuted.set(false); + inputTopic.pipeInput(eventFalse.key, eventFalse.value, Instant.EPOCH); + assertFalse(isExecuted.get()); + } + } + + @Test + public void shouldStopProcessingWhenFatalUserExceptionInFailProcessingExceptionHandler() { + final KeyValue event = new KeyValue<>("ID123-1", "ID123-A1"); + final KeyValue eventError = new KeyValue<>("ID123-ERR-FATAL", "ID123-A2"); + + final MockProcessorSupplier processor = new MockProcessorSupplier<>(); + final StreamsBuilder builder = new StreamsBuilder(); + final AtomicBoolean isExecuted = new AtomicBoolean(false); + builder + .stream("TOPIC_NAME", Consumed.with(Serdes.String(), Serdes.String())) + .map(KeyValue::new) + .mapValues(value -> value) + .process(runtimeErrorProcessorSupplierMock()) + .map((k, v) -> { + isExecuted.set(true); + return KeyValue.pair(k, v); + }) + .process(processor); + + final Properties properties = new Properties(); + properties.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, FailProcessingExceptionHandlerMockTest.class); + + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), properties, Instant.ofEpochMilli(0L))) { + final TestInputTopic inputTopic = driver.createInputTopic("TOPIC_NAME", new StringSerializer(), new StringSerializer()); + isExecuted.set(false); + inputTopic.pipeInput(event.key, event.value, Instant.EPOCH); + assertTrue(isExecuted.get()); + isExecuted.set(false); + final StreamsException e = assertThrows(StreamsException.class, () -> inputTopic.pipeInput(eventError.key, eventError.value, Instant.EPOCH)); + assertEquals("KABOOM!", e.getCause().getMessage()); + assertFalse(isExecuted.get()); + } + } + + @Test + public void shouldStopProcessingWhenFatalUserExceptionInContinueProcessingExceptionHandler() { + final KeyValue event = new KeyValue<>("ID123-1", "ID123-A1"); + final KeyValue eventError = new KeyValue<>("ID123-ERR-FATAL", "ID123-A2"); + + final MockProcessorSupplier processor = new MockProcessorSupplier<>(); + final StreamsBuilder builder = new StreamsBuilder(); + final AtomicBoolean isExecuted = new AtomicBoolean(false); + builder + .stream("TOPIC_NAME", Consumed.with(Serdes.String(), Serdes.String())) + .map(KeyValue::new) + .mapValues(value -> value) + .process(runtimeErrorProcessorSupplierMock()) + .map((k, v) -> { + isExecuted.set(true); + return KeyValue.pair(k, v); + }) + .process(processor); + + final Properties properties = new Properties(); + properties.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, ContinueProcessingExceptionHandlerMockTest.class); + + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), properties, Instant.ofEpochMilli(0L))) { + final TestInputTopic inputTopic = driver.createInputTopic("TOPIC_NAME", new StringSerializer(), new StringSerializer()); + isExecuted.set(false); + inputTopic.pipeInput(event.key, event.value, Instant.EPOCH); + assertTrue(isExecuted.get()); + isExecuted.set(false); + final StreamsException e = assertThrows(StreamsException.class, () -> inputTopic.pipeInput(eventError.key, eventError.value, Instant.EPOCH)); + assertEquals("KABOOM!", e.getCause().getMessage()); + assertFalse(isExecuted.get()); + } + } + public static class ContinueProcessingExceptionHandlerMockTest implements ProcessingExceptionHandler { @Override public ProcessingExceptionHandler.ProcessingHandlerResponse handle(final ErrorHandlerContext context, final Record record, final Exception exception) { + if (((String) record.key()).contains("FATAL")) { + throw new RuntimeException("KABOOM!"); + } assertProcessingExceptionHandlerInputs(context, record, exception); return ProcessingExceptionHandler.ProcessingHandlerResponse.CONTINUE; } @@ -165,6 +308,9 @@ public void configure(final Map configs) { public static class FailProcessingExceptionHandlerMockTest implements ProcessingExceptionHandler { @Override public ProcessingExceptionHandler.ProcessingHandlerResponse handle(final ErrorHandlerContext context, final Record record, final Exception exception) { + if (((String) record.key()).contains("FATAL")) { + throw new RuntimeException("KABOOM!"); + } assertProcessingExceptionHandlerInputs(context, record, exception); return ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL; } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java index df3f927686362..9fe9244e0aa1d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java @@ -60,6 +60,7 @@ import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; @@ -100,7 +101,7 @@ public void shouldThrowFailedProcessingExceptionWhenProcessingExceptionHandlerRe new ProcessorNode<>(NAME, new IgnoredInternalExceptionsProcessor(), Collections.emptySet()); final InternalProcessorContext internalProcessorContext = mockInternalProcessorContext(); - node.init(internalProcessorContext, new ProcessingExceptionHandlerMock(ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL, internalProcessorContext)); + node.init(internalProcessorContext, new ProcessingExceptionHandlerMock(ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL, internalProcessorContext, false)); final FailedProcessingException failedProcessingException = assertThrows(FailedProcessingException.class, () -> node.process(new Record<>(KEY, VALUE, TIMESTAMP))); @@ -116,7 +117,7 @@ public void shouldNotThrowFailedProcessingExceptionWhenProcessingExceptionHandle new ProcessorNode<>(NAME, new IgnoredInternalExceptionsProcessor(), Collections.emptySet()); final InternalProcessorContext internalProcessorContext = mockInternalProcessorContext(); - node.init(internalProcessorContext, new ProcessingExceptionHandlerMock(ProcessingExceptionHandler.ProcessingHandlerResponse.CONTINUE, internalProcessorContext)); + node.init(internalProcessorContext, new ProcessingExceptionHandlerMock(ProcessingExceptionHandler.ProcessingHandlerResponse.CONTINUE, internalProcessorContext, false)); assertDoesNotThrow(() -> node.process(new Record<>(KEY, VALUE, TIMESTAMP))); } @@ -146,6 +147,21 @@ public void shouldNotHandleInternalExceptionsThrownDuringProcessing(final String verify(processingExceptionHandler, never()).handle(any(), any(), any()); } + @Test + public void shouldThrowFailedProcessingExceptionWhenProcessingExceptionHandlerThrowsAnException() { + final ProcessorNode node = + new ProcessorNode<>(NAME, new IgnoredInternalExceptionsProcessor(), Collections.emptySet()); + + final InternalProcessorContext internalProcessorContext = mockInternalProcessorContext(); + node.init(internalProcessorContext, new ProcessingExceptionHandlerMock(ProcessingExceptionHandler.ProcessingHandlerResponse.CONTINUE, internalProcessorContext, true)); + + final FailedProcessingException failedProcessingException = assertThrows(FailedProcessingException.class, + () -> node.process(new Record<>(KEY, VALUE, TIMESTAMP))); + + assertInstanceOf(RuntimeException.class, failedProcessingException.getCause()); + assertEquals("KABOOM!", failedProcessingException.getCause().getMessage()); + } + private static class ExceptionalProcessor implements Processor { @Override public void init(final ProcessorContext context) { @@ -323,10 +339,14 @@ public static class ProcessingExceptionHandlerMock implements ProcessingExceptio private final ProcessingExceptionHandler.ProcessingHandlerResponse response; private final InternalProcessorContext internalProcessorContext; + private final boolean shouldThrowException; + public ProcessingExceptionHandlerMock(final ProcessingExceptionHandler.ProcessingHandlerResponse response, - final InternalProcessorContext internalProcessorContext) { + final InternalProcessorContext internalProcessorContext, + final boolean shouldThrowException) { this.response = response; this.internalProcessorContext = internalProcessorContext; + this.shouldThrowException = shouldThrowException; } @Override @@ -338,9 +358,12 @@ public ProcessingExceptionHandler.ProcessingHandlerResponse handle(final ErrorHa assertEquals(internalProcessorContext.taskId(), context.taskId()); assertEquals(KEY, record.key()); assertEquals(VALUE, record.value()); - assertTrue(exception instanceof RuntimeException); + assertInstanceOf(RuntimeException.class, exception); assertEquals("Processing exception should be caught and handled by the processing exception handler.", exception.getMessage()); + if (shouldThrowException) { + throw new RuntimeException("KABOOM!"); + } return response; }