From 8d5d266486d000e2c91f5a75b12e932e571912ca Mon Sep 17 00:00:00 2001 From: Sebastien Viale Date: Mon, 24 Jun 2024 12:39:30 +0200 Subject: [PATCH 1/5] KAFKA-16448 Add Error Handler Context in Deserialization Exception Handler Co-authored-by: Dabz Co-authored-by: loicgreffier --- .../DeserializationExceptionHandler.java | 26 +++- .../LogAndContinueExceptionHandler.java | 14 ++ .../errors/LogAndFailExceptionHandler.java | 14 ++ .../internals/DefaultErrorHandlerContext.java | 19 +++ .../internals/RecordDeserializer.java | 26 +++- .../internals/RecordDeserializerTest.java | 134 +++++++++++++++++- 6 files changed, 217 insertions(+), 16 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java b/streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java index 95ccfeced8e43..56d07df4b5dfa 100644 --- a/streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java +++ b/streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java @@ -16,9 +16,9 @@ */ package org.apache.kafka.streams.errors; - import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.Configurable; +import org.apache.kafka.streams.errors.internals.DefaultErrorHandlerContext; import org.apache.kafka.streams.processor.ProcessorContext; /** @@ -37,11 +37,27 @@ public interface DeserializationExceptionHandler extends Configurable { * @param context processor context * @param record record that failed deserialization * @param exception the actual exception + * @deprecated Please {@link #handle(ErrorHandlerContext, ConsumerRecord, Exception)} + */ + @Deprecated + default DeserializationHandlerResponse handle(final ProcessorContext context, + final ConsumerRecord record, + final Exception exception) { + throw new UnsupportedOperationException(); + } + + /** + * Inspect a record and the exception received. + * + * @param context error handler context + * @param record record that failed deserialization + * @param exception the actual exception */ - @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated. - DeserializationHandlerResponse handle(final ProcessorContext context, - final ConsumerRecord record, - final Exception exception); + default DeserializationHandlerResponse handle(final ErrorHandlerContext context, + final ConsumerRecord record, + final Exception exception) { + return handle(((DefaultErrorHandlerContext) context).processorContext(), record, exception); + } /** * Enumeration that describes the response from the exception handler. diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java index a468be2e67a9c..a93b7c99517c3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java +++ b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java @@ -32,6 +32,7 @@ public class LogAndContinueExceptionHandler implements DeserializationExceptionHandler { private static final Logger log = LoggerFactory.getLogger(LogAndContinueExceptionHandler.class); + @Deprecated @Override public DeserializationHandlerResponse handle(final ProcessorContext context, final ConsumerRecord record, @@ -45,6 +46,19 @@ public DeserializationHandlerResponse handle(final ProcessorContext context, return DeserializationHandlerResponse.CONTINUE; } + @Override + public DeserializationHandlerResponse handle(final ErrorHandlerContext context, + final ConsumerRecord record, + final Exception exception) { + + log.warn("Exception caught during Deserialization, " + + "taskId: {}, topic: {}, partition: {}, offset: {}", + context.taskId(), record.topic(), record.partition(), record.offset(), + exception); + + return DeserializationHandlerResponse.CONTINUE; + } + @Override public void configure(final Map configs) { // ignore diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java index 3f0fd48652008..5fdda623bdd2c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java +++ b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java @@ -33,6 +33,7 @@ public class LogAndFailExceptionHandler implements DeserializationExceptionHandl private static final Logger log = LoggerFactory.getLogger(LogAndFailExceptionHandler.class); @Override + @Deprecated public DeserializationHandlerResponse handle(final ProcessorContext context, final ConsumerRecord record, final Exception exception) { @@ -45,6 +46,19 @@ public DeserializationHandlerResponse handle(final ProcessorContext context, return DeserializationHandlerResponse.FAIL; } + @Override + public DeserializationHandlerResponse handle(final ErrorHandlerContext context, + final ConsumerRecord record, + final Exception exception) { + + log.error("Exception caught during Deserialization, " + + "taskId: {}, topic: {}, partition: {}, offset: {}", + context.taskId(), record.topic(), record.partition(), record.offset(), + exception); + + return DeserializationHandlerResponse.FAIL; + } + @Override public void configure(final Map configs) { // ignore diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/internals/DefaultErrorHandlerContext.java b/streams/src/main/java/org/apache/kafka/streams/errors/internals/DefaultErrorHandlerContext.java index ff79860d77e30..7ec8069d018dc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/errors/internals/DefaultErrorHandlerContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/errors/internals/DefaultErrorHandlerContext.java @@ -18,6 +18,7 @@ import org.apache.kafka.common.header.Headers; import org.apache.kafka.streams.errors.ErrorHandlerContext; +import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.TaskId; /** @@ -32,6 +33,7 @@ public class DefaultErrorHandlerContext implements ErrorHandlerContext { private final byte[] sourceRawValue; private final String processorNodeId; private final TaskId taskId; + private ProcessorContext processorContext; public DefaultErrorHandlerContext(final String topic, final int partition, @@ -51,6 +53,19 @@ public DefaultErrorHandlerContext(final String topic, this.taskId = taskId; } + public DefaultErrorHandlerContext(final ProcessorContext processorContext, + final String topic, + final int partition, + final long offset, + final Headers headers, + final byte[] sourceRawKey, + final byte[] sourceRawValue, + final String processorNodeId, + final TaskId taskId) { + this(topic, partition, offset, headers, sourceRawKey, sourceRawValue, processorNodeId, taskId); + this.processorContext = processorContext; + } + @Override public String topic() { return topic; @@ -90,4 +105,8 @@ public String processorNodeId() { public TaskId taskId() { return taskId; } + + public ProcessorContext processorContext() { + return this.processorContext; + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java index 2680c179b5453..23949a3c64639 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.errors.DeserializationExceptionHandler; import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.errors.internals.DefaultErrorHandlerContext; import org.apache.kafka.streams.processor.api.ProcessorContext; import org.slf4j.Logger; @@ -69,7 +70,7 @@ ConsumerRecord deserialize(final ProcessorContext processo Optional.empty() ); } catch (final Exception deserializationException) { - handleDeserializationFailure(deserializationExceptionHandler, processorContext, deserializationException, rawRecord, log, droppedRecordsSensor); + handleDeserializationFailure(deserializationExceptionHandler, processorContext, deserializationException, rawRecord, log, droppedRecordsSensor, sourceNode().name()); return null; // 'handleDeserializationFailure' would either throw or swallow -- if we swallow we need to skip the record by returning 'null' } } @@ -80,12 +81,29 @@ public static void handleDeserializationFailure(final DeserializationExceptionHa final ConsumerRecord rawRecord, final Logger log, final Sensor droppedRecordsSensor) { + handleDeserializationFailure(deserializationExceptionHandler, processorContext, deserializationException, rawRecord, log, droppedRecordsSensor, null); + } + + public static void handleDeserializationFailure(final DeserializationExceptionHandler deserializationExceptionHandler, + final ProcessorContext processorContext, + final Exception deserializationException, + final ConsumerRecord rawRecord, + final Logger log, + final Sensor droppedRecordsSensor, + final String sourceNodeName) { final DeserializationExceptionHandler.DeserializationHandlerResponse response; try { - response = deserializationExceptionHandler.handle( + final DefaultErrorHandlerContext errorHandlerContext = new DefaultErrorHandlerContext( (InternalProcessorContext) processorContext, - rawRecord, - deserializationException); + rawRecord.topic(), + rawRecord.partition(), + rawRecord.offset(), + rawRecord.headers(), + rawRecord.key(), + rawRecord.value(), + sourceNodeName, + processorContext.taskId()); + response = deserializationExceptionHandler.handle(errorHandlerContext, rawRecord, deserializationException); } catch (final Exception fatalUserException) { log.error( "Deserialization error callback failed after deserialization error for record {}", diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java index ac973c1789dd3..11a7006d4b8d9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java @@ -24,16 +24,29 @@ import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.streams.errors.DeserializationExceptionHandler; +import org.apache.kafka.streams.errors.ErrorHandlerContext; +import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.test.InternalMockProcessorContext; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import java.util.Map; import java.util.Optional; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; public class RecordDeserializerTest { - - private final RecordHeaders headers = new RecordHeaders(new Header[] {new RecordHeader("key", "value".getBytes())}); + private final String sourceNodeName = "source-node"; + private final TaskId taskId = new TaskId(0, 0); + private final RecordHeaders headers = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())}); private final ConsumerRecord rawRecord = new ConsumerRecord<>("topic", 1, 1, @@ -46,13 +59,17 @@ public class RecordDeserializerTest { headers, Optional.empty()); + private final InternalProcessorContext context = new InternalMockProcessorContext<>(); + @Test public void shouldReturnConsumerRecordWithDeserializedValueWhenNoExceptions() { final RecordDeserializer recordDeserializer = new RecordDeserializer( new TheSourceNode( + sourceNodeName, false, false, - "key", "value" + "key", + "value" ), null, new LogContext(), @@ -69,17 +86,82 @@ public void shouldReturnConsumerRecordWithDeserializedValueWhenNoExceptions() { assertEquals(rawRecord.headers(), record.headers()); } + @ParameterizedTest + @CsvSource({ + "true, true", + "true, false", + "false, true", + }) + public void shouldThrowStreamsExceptionWhenDeserializationFailsAndExceptionHandlerRepliesWithFail(final boolean keyThrowsException, + final boolean valueThrowsException) { + final RecordDeserializer recordDeserializer = new RecordDeserializer( + new TheSourceNode( + sourceNodeName, + keyThrowsException, + valueThrowsException, + "key", + "value" + ), + new DeserializationExceptionHandlerMock( + DeserializationExceptionHandler.DeserializationHandlerResponse.FAIL, + rawRecord, + sourceNodeName, + taskId + ), + new LogContext(), + new Metrics().sensor("dropped-records") + ); + + final StreamsException e = assertThrows(StreamsException.class, () -> recordDeserializer.deserialize(context, rawRecord)); + assertEquals(e.getMessage(), "Deserialization exception handler is set " + + "to fail upon a deserialization error. " + + "If you would rather have the streaming pipeline " + + "continue after a deserialization error, please set the " + + "default.deserialization.exception.handler appropriately."); + } + + @ParameterizedTest + @CsvSource({ + "true, true", + "true, false", + "false, true" + }) + public void shouldNotThrowStreamsExceptionWhenDeserializationFailsAndExceptionHandlerRepliesWithContinue(final boolean keyThrowsException, + final boolean valueThrowsException) { + final RecordDeserializer recordDeserializer = new RecordDeserializer( + new TheSourceNode( + sourceNodeName, + keyThrowsException, + valueThrowsException, + "key", + "value" + ), + new DeserializationExceptionHandlerMock( + DeserializationExceptionHandler.DeserializationHandlerResponse.CONTINUE, + rawRecord, + sourceNodeName, + taskId + ), + new LogContext(), + new Metrics().sensor("dropped-records") + ); + + final ConsumerRecord record = recordDeserializer.deserialize(context, rawRecord); + assertNull(record); + } + static class TheSourceNode extends SourceNode { private final boolean keyThrowsException; private final boolean valueThrowsException; private final Object key; private final Object value; - TheSourceNode(final boolean keyThrowsException, + TheSourceNode(final String name, + final boolean keyThrowsException, final boolean valueThrowsException, final Object key, final Object value) { - super("", null, null); + super(name, null, null); this.keyThrowsException = keyThrowsException; this.valueThrowsException = valueThrowsException; this.key = key; @@ -89,7 +171,7 @@ static class TheSourceNode extends SourceNode { @Override public Object deserializeKey(final String topic, final Headers headers, final byte[] data) { if (keyThrowsException) { - throw new RuntimeException(); + throw new RuntimeException("KABOOM!"); } return key; } @@ -97,10 +179,48 @@ public Object deserializeKey(final String topic, final Headers headers, final by @Override public Object deserializeValue(final String topic, final Headers headers, final byte[] data) { if (valueThrowsException) { - throw new RuntimeException(); + throw new RuntimeException("KABOOM!"); } return value; } } + public static class DeserializationExceptionHandlerMock implements DeserializationExceptionHandler { + private final DeserializationHandlerResponse response; + private final ConsumerRecord expectedRecord; + private final String expectedProcessorNodeId; + private final TaskId expectedTaskId; + + public DeserializationExceptionHandlerMock(final DeserializationHandlerResponse response, + final ConsumerRecord record, + final String processorNodeId, + final TaskId taskId) { + this.response = response; + this.expectedRecord = record; + this.expectedProcessorNodeId = processorNodeId; + this.expectedTaskId = taskId; + } + + @Override + public DeserializationHandlerResponse handle(final ErrorHandlerContext context, + final ConsumerRecord record, + final Exception exception) { + assertEquals(expectedRecord.topic(), context.topic()); + assertEquals(expectedRecord.partition(), context.partition()); + assertEquals(expectedRecord.offset(), context.offset()); + assertArrayEquals(expectedRecord.key(), context.sourceRawKey()); + assertArrayEquals(expectedRecord.value(), context.sourceRawValue()); + assertEquals(expectedProcessorNodeId, context.processorNodeId()); + assertEquals(expectedTaskId, context.taskId()); + assertEquals(expectedRecord, record); + assertInstanceOf(RuntimeException.class, exception); + assertEquals("KABOOM!", exception.getMessage()); + return response; + } + + @Override + public void configure(final Map configs) { + // do nothing + } + } } From 7afd77f7113f2f2434245df1618355dd37ccd61f Mon Sep 17 00:00:00 2001 From: Sebastien Viale Date: Mon, 29 Jul 2024 11:10:10 +0200 Subject: [PATCH 2/5] KAFKA-16448 Add Error Handler Context in Deserialization Exception Handler Co-authored-by: Dabz Co-authored-by: loicgreffier --- .../DeserializationExceptionHandler.java | 4 ++-- .../internals/DefaultErrorHandlerContext.java | 21 ++++++------------- .../processor/internals/ProcessorNode.java | 1 + 3 files changed, 9 insertions(+), 17 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java b/streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java index 56d07df4b5dfa..0d64611de678d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java +++ b/streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java @@ -37,7 +37,7 @@ public interface DeserializationExceptionHandler extends Configurable { * @param context processor context * @param record record that failed deserialization * @param exception the actual exception - * @deprecated Please {@link #handle(ErrorHandlerContext, ConsumerRecord, Exception)} + * @deprecated Since 3.9. Use Please {@link #handle(ErrorHandlerContext, ConsumerRecord, Exception)} */ @Deprecated default DeserializationHandlerResponse handle(final ProcessorContext context, @@ -56,7 +56,7 @@ default DeserializationHandlerResponse handle(final ProcessorContext context, default DeserializationHandlerResponse handle(final ErrorHandlerContext context, final ConsumerRecord record, final Exception exception) { - return handle(((DefaultErrorHandlerContext) context).processorContext(), record, exception); + return handle(((DefaultErrorHandlerContext) context).processorContext().orElse(null), record, exception); } /** diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/internals/DefaultErrorHandlerContext.java b/streams/src/main/java/org/apache/kafka/streams/errors/internals/DefaultErrorHandlerContext.java index 7ec8069d018dc..432e04efde793 100644 --- a/streams/src/main/java/org/apache/kafka/streams/errors/internals/DefaultErrorHandlerContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/errors/internals/DefaultErrorHandlerContext.java @@ -21,6 +21,8 @@ import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.TaskId; +import java.util.Optional; + /** * Default implementation of {@link ErrorHandlerContext} that provides access to the metadata of the record that caused the error. */ @@ -35,7 +37,8 @@ public class DefaultErrorHandlerContext implements ErrorHandlerContext { private final TaskId taskId; private ProcessorContext processorContext; - public DefaultErrorHandlerContext(final String topic, + public DefaultErrorHandlerContext(final ProcessorContext processorContext, + final String topic, final int partition, final long offset, final Headers headers, @@ -51,18 +54,6 @@ public DefaultErrorHandlerContext(final String topic, this.sourceRawValue = sourceRawValue; this.processorNodeId = processorNodeId; this.taskId = taskId; - } - - public DefaultErrorHandlerContext(final ProcessorContext processorContext, - final String topic, - final int partition, - final long offset, - final Headers headers, - final byte[] sourceRawKey, - final byte[] sourceRawValue, - final String processorNodeId, - final TaskId taskId) { - this(topic, partition, offset, headers, sourceRawKey, sourceRawValue, processorNodeId, taskId); this.processorContext = processorContext; } @@ -106,7 +97,7 @@ public TaskId taskId() { return taskId; } - public ProcessorContext processorContext() { - return this.processorContext; + public Optional processorContext() { + return Optional.ofNullable(processorContext); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java index eabf9e3d5c471..122571787d5db 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java @@ -205,6 +205,7 @@ public void process(final Record record) { throw e; } catch (final Exception e) { final ErrorHandlerContext errorHandlerContext = new DefaultErrorHandlerContext( + null, internalProcessorContext.topic(), internalProcessorContext.partition(), internalProcessorContext.offset(), From 2ebb123b0557d7747b4742da1916e439d7f7679b Mon Sep 17 00:00:00 2001 From: Sebastien Viale Date: Mon, 29 Jul 2024 11:20:36 +0200 Subject: [PATCH 3/5] KAFKA-16448 Add Error Handler Context in Deserialization Exception Handler remove rawKey and rawValue Co-authored-by: Dabz Co-authored-by: loicgreffier --- .../streams/errors/ErrorHandlerContext.java | 28 ------------------- .../internals/DefaultErrorHandlerContext.java | 16 ----------- .../processor/internals/ProcessorNode.java | 2 -- .../internals/RecordDeserializer.java | 2 -- ...essingExceptionHandlerIntegrationTest.java | 2 -- .../internals/ProcessorNodeTest.java | 3 -- .../internals/RecordDeserializerTest.java | 3 -- 7 files changed, 56 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/ErrorHandlerContext.java b/streams/src/main/java/org/apache/kafka/streams/errors/ErrorHandlerContext.java index 0c50547549027..6c5e4f19596c7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/errors/ErrorHandlerContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/errors/ErrorHandlerContext.java @@ -89,34 +89,6 @@ public interface ErrorHandlerContext { */ Headers headers(); - /** - * Return the non-deserialized byte[] of the input message key if the context has been triggered by a message. - * - *

If this method is invoked within a {@link Punctuator#punctuate(long) - * punctuation callback}, or while processing a record that was forwarded by a punctuation - * callback, it will return {@code null}. - * - *

If this method is invoked in a sub-topology due to a repartition, the returned key would be one sent - * to the repartition topic. - * - * @return the raw byte of the key of the source message - */ - byte[] sourceRawKey(); - - /** - * Return the non-deserialized byte[] of the input message value if the context has been triggered by a message. - * - *

If this method is invoked within a {@link Punctuator#punctuate(long) - * punctuation callback}, or while processing a record that was forwarded by a punctuation - * callback, it will return {@code null}. - * - *

If this method is invoked in a sub-topology due to a repartition, the returned value would be one sent - * to the repartition topic. - * - * @return the raw byte of the value of the source message - */ - byte[] sourceRawValue(); - /** * Return the current processor node ID. * diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/internals/DefaultErrorHandlerContext.java b/streams/src/main/java/org/apache/kafka/streams/errors/internals/DefaultErrorHandlerContext.java index 432e04efde793..c907ff3eb89ae 100644 --- a/streams/src/main/java/org/apache/kafka/streams/errors/internals/DefaultErrorHandlerContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/errors/internals/DefaultErrorHandlerContext.java @@ -31,8 +31,6 @@ public class DefaultErrorHandlerContext implements ErrorHandlerContext { private final int partition; private final long offset; private final Headers headers; - private final byte[] sourceRawKey; - private final byte[] sourceRawValue; private final String processorNodeId; private final TaskId taskId; private ProcessorContext processorContext; @@ -42,16 +40,12 @@ public DefaultErrorHandlerContext(final ProcessorContext processorContext, final int partition, final long offset, final Headers headers, - final byte[] sourceRawKey, - final byte[] sourceRawValue, final String processorNodeId, final TaskId taskId) { this.topic = topic; this.partition = partition; this.offset = offset; this.headers = headers; - this.sourceRawKey = sourceRawKey; - this.sourceRawValue = sourceRawValue; this.processorNodeId = processorNodeId; this.taskId = taskId; this.processorContext = processorContext; @@ -77,16 +71,6 @@ public Headers headers() { return headers; } - @Override - public byte[] sourceRawKey() { - return sourceRawKey; - } - - @Override - public byte[] sourceRawValue() { - return sourceRawValue; - } - @Override public String processorNodeId() { return processorNodeId; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java index 122571787d5db..65eec47cb1d3a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java @@ -210,8 +210,6 @@ public void process(final Record record) { internalProcessorContext.partition(), internalProcessorContext.offset(), internalProcessorContext.headers(), - internalProcessorContext.recordContext().rawRecord().key(), - internalProcessorContext.recordContext().rawRecord().value(), internalProcessorContext.currentNode().name(), internalProcessorContext.taskId()); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java index 23949a3c64639..8ee2dc014ebc6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java @@ -99,8 +99,6 @@ public static void handleDeserializationFailure(final DeserializationExceptionHa rawRecord.partition(), rawRecord.offset(), rawRecord.headers(), - rawRecord.key(), - rawRecord.value(), sourceNodeName, processorContext.taskId()); response = deserializationExceptionHandler.handle(errorHandlerContext, rawRecord, deserializationException); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java index bb29ac64f02e0..6c1a64344ec76 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java @@ -176,8 +176,6 @@ public void configure(final Map configs) { } private static void assertProcessingExceptionHandlerInputs(final ErrorHandlerContext context, final Record record, final Exception exception) { - assertTrue(Arrays.asList("ID123-2-ERR", "ID123-5-ERR").contains(new String(context.sourceRawKey()))); - assertTrue(Arrays.asList("ID123-A2", "ID123-A5").contains(new String(context.sourceRawValue()))); assertTrue(Arrays.asList("ID123-2-ERR", "ID123-5-ERR").contains((String) record.key())); assertTrue(Arrays.asList("ID123-A2", "ID123-A5").contains((String) record.value())); assertEquals("TOPIC_NAME", context.topic()); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java index 6be033cb2dc08..b8d0d2a21bc4d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java @@ -58,7 +58,6 @@ import java.util.Set; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ROLLUP_VALUE; -import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -337,8 +336,6 @@ public ProcessingExceptionHandler.ProcessingHandlerResponse handle(final ErrorHa assertEquals(internalProcessorContext.topic(), context.topic()); assertEquals(internalProcessorContext.partition(), context.partition()); assertEquals(internalProcessorContext.offset(), context.offset()); - assertArrayEquals(internalProcessorContext.recordContext().rawRecord().key(), context.sourceRawKey()); - assertArrayEquals(internalProcessorContext.recordContext().rawRecord().value(), context.sourceRawValue()); assertEquals(internalProcessorContext.currentNode().name(), context.processorNodeId()); assertEquals(internalProcessorContext.taskId(), context.taskId()); assertEquals(KEY, record.key()); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java index 11a7006d4b8d9..23f364fc6a317 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java @@ -37,7 +37,6 @@ import java.util.Map; import java.util.Optional; -import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNull; @@ -208,8 +207,6 @@ public DeserializationHandlerResponse handle(final ErrorHandlerContext context, assertEquals(expectedRecord.topic(), context.topic()); assertEquals(expectedRecord.partition(), context.partition()); assertEquals(expectedRecord.offset(), context.offset()); - assertArrayEquals(expectedRecord.key(), context.sourceRawKey()); - assertArrayEquals(expectedRecord.value(), context.sourceRawValue()); assertEquals(expectedProcessorNodeId, context.processorNodeId()); assertEquals(expectedTaskId, context.taskId()); assertEquals(expectedRecord, record); From c5fecd7ac20757815001308130dbfdec50749aee Mon Sep 17 00:00:00 2001 From: Sebastien Viale Date: Mon, 29 Jul 2024 15:31:15 +0200 Subject: [PATCH 4/5] KAFKA-16448 Add Error Handler Context in Deserialization Exception Handler remove rawKey and rawValue Co-authored-by: Dabz Co-authored-by: loicgreffier --- .../internals/GlobalStateManagerImpl.java | 3 +-- .../internals/GlobalStateUpdateTask.java | 3 +-- .../processor/internals/ProcessorAdapter.java | 3 +-- .../internals/ProcessorContextImpl.java | 3 +-- .../internals/ProcessorRecordContext.java | 18 +----------------- .../streams/processor/internals/SinkNode.java | 3 +-- .../processor/internals/StreamTask.java | 6 ++---- .../processor/internals/ProcessorNodeTest.java | 4 +--- 8 files changed, 9 insertions(+), 34 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java index b1263ddc58df6..6b7214a9ed185 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java @@ -307,8 +307,7 @@ private void reprocessState(final List topicPartitions, record.offset(), record.partition(), record.topic(), - record.headers(), - record); + record.headers()); globalProcessorContext.setRecordContext(recordContext); try { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java index 1713efb52a9bd..12a6beedbcd98 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java @@ -113,8 +113,7 @@ public void update(final ConsumerRecord record) { deserialized.offset(), deserialized.partition(), deserialized.topic(), - deserialized.headers(), - record); + deserialized.headers()); processorContext.setRecordContext(recordContext); processorContext.setCurrentNode(sourceNodeAndDeserializer.sourceNode()); final Record toProcess = new Record<>( diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java index a5d88f5a7f8a9..79db3847cfb06 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java @@ -66,8 +66,7 @@ public void process(final Record record) { context.offset(), context.partition(), context.topic(), - record.headers(), - processorRecordContext.rawRecord() + record.headers() )); delegate.process(record.key(), record.value()); } finally { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java index 6a79434b622ab..b484d26f0fe87 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java @@ -261,8 +261,7 @@ public void forward(final Record record, final String childName) { recordContext.offset(), recordContext.partition(), recordContext.topic(), - record.headers(), - recordContext.rawRecord()); + record.headers()); } if (childName == null) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java index 3d1ce0529e678..839baaad87528 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams.processor.internals; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeader; @@ -38,28 +37,17 @@ public class ProcessorRecordContext implements RecordContext, RecordMetadata { private final String topic; private final int partition; private final Headers headers; - private final ConsumerRecord rawRecord; public ProcessorRecordContext(final long timestamp, final long offset, final int partition, final String topic, final Headers headers) { - this(timestamp, offset, partition, topic, headers, null); - } - - public ProcessorRecordContext(final long timestamp, - final long offset, - final int partition, - final String topic, - final Headers headers, - final ConsumerRecord rawRecord) { this.timestamp = timestamp; this.offset = offset; this.topic = topic; this.partition = partition; this.headers = Objects.requireNonNull(headers); - this.rawRecord = rawRecord; } @Override @@ -87,10 +75,6 @@ public Headers headers() { return headers; } - public ConsumerRecord rawRecord() { - return rawRecord; - } - public long residentMemorySizeEstimate() { long size = 0; size += Long.BYTES; // value.context.timestamp @@ -189,7 +173,7 @@ public static ProcessorRecordContext deserialize(final ByteBuffer buffer) { headers = new RecordHeaders(headerArr); } - return new ProcessorRecordContext(timestamp, offset, partition, topic, headers, null); + return new ProcessorRecordContext(timestamp, offset, partition, topic, headers); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java index 871cb2284ee4d..6e79616d30a9c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java @@ -88,8 +88,7 @@ public void process(final Record record) { context.offset(), context.partition(), context.topic(), - record.headers(), - context.recordContext().rawRecord() + record.headers() ); final String topic = topicExtractor.extract(key, value, contextForExtraction); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 30b9038aa6a67..8cbe5780b90bd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -844,8 +844,7 @@ private void doProcess(final long wallClockTime) { record.offset(), record.partition(), record.topic(), - record.headers(), - record.rawRecord() + record.headers() ); updateProcessorContext(currNode, wallClockTime, recordContext); @@ -906,8 +905,7 @@ public void punctuate(final ProcessorNode node, -1L, -1, null, - new RecordHeaders(), - null + new RecordHeaders() ); updateProcessorContext(node, time.milliseconds(), recordContext); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java index b8d0d2a21bc4d..df3f927686362 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams.processor.internals; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.InvalidOffsetException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.header.internals.RecordHeaders; @@ -314,8 +313,7 @@ private InternalProcessorContext mockInternalProcessorContext() OFFSET, PARTITION, TOPIC, - new RecordHeaders(), - new ConsumerRecord<>(TOPIC, PARTITION, OFFSET, KEY.getBytes(), VALUE.getBytes()))); + new RecordHeaders())); when(internalProcessorContext.currentNode()).thenReturn(new ProcessorNode<>(NAME)); return internalProcessorContext; From afcbadd052c48dc2243b4ddc2b92937409021738 Mon Sep 17 00:00:00 2001 From: Sebastien Viale Date: Mon, 29 Jul 2024 16:20:14 +0200 Subject: [PATCH 5/5] KAFKA-16448 Add Error Handler Context in Deserialization Exception Handler Co-authored-by: Dabz Co-authored-by: loicgreffier --- .../processor/internals/CorruptedRecord.java | 2 +- .../processor/internals/RecordQueue.java | 2 +- .../processor/internals/StampedRecord.java | 18 +----------------- 3 files changed, 3 insertions(+), 19 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/CorruptedRecord.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/CorruptedRecord.java index 1bc8cb51092c6..d31a29883cabe 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/CorruptedRecord.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/CorruptedRecord.java @@ -28,7 +28,7 @@ public class CorruptedRecord extends StampedRecord { CorruptedRecord(final ConsumerRecord rawRecord) { - super(rawRecord, ConsumerRecord.NO_TIMESTAMP, rawRecord); + super(rawRecord, ConsumerRecord.NO_TIMESTAMP); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java index a3c9ea67f067d..a6b30a07ef96d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java @@ -230,7 +230,7 @@ private void updateHead() { droppedRecordsSensor.record(); continue; } - headRecord = new StampedRecord(deserialized, timestamp, raw); + headRecord = new StampedRecord(deserialized, timestamp); headRecordSizeInBytes = consumerRecordSizeInBytes(raw); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StampedRecord.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StampedRecord.java index d82cd98ed7eec..71e3ca2e3ceca 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StampedRecord.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StampedRecord.java @@ -20,11 +20,9 @@ import org.apache.kafka.common.header.Headers; public class StampedRecord extends Stamped> { - private final ConsumerRecord rawRecord; - public StampedRecord(final ConsumerRecord record, final long timestamp, final ConsumerRecord rawRecord) { + public StampedRecord(final ConsumerRecord record, final long timestamp) { super(record, timestamp); - this.rawRecord = rawRecord; } public String topic() { @@ -51,20 +49,6 @@ public Headers headers() { return value.headers(); } - public ConsumerRecord rawRecord() { - return rawRecord; - } - - @Override - public boolean equals(final Object other) { - return super.equals(other); - } - - @Override - public int hashCode() { - return super.hashCode(); - } - @Override public String toString() { return value.toString() + ", timestamp = " + timestamp;