From 4f367f74693d97abf46b7127a4ff541913ca73a7 Mon Sep 17 00:00:00 2001 From: Sebastien Viale Date: Mon, 24 Jun 2024 11:13:24 +0200 Subject: [PATCH 1/7] KAFKA-16448 Add Error Handler Context for Production Exception Handler Co-authored-by: Dabz Co-authored-by: loicgreffier --- .../DefaultProductionExceptionHandler.java | 8 + .../errors/ProductionExceptionHandler.java | 49 ++++- .../internals/RecordCollectorImpl.java | 185 +++++++++++++----- ...aysContinueProductionExceptionHandler.java | 44 ----- .../internals/RecordCollectorTest.java | 141 +++++++++++-- .../test/InternalMockProcessorContext.java | 8 + 6 files changed, 321 insertions(+), 114 deletions(-) delete mode 100644 streams/src/test/java/org/apache/kafka/streams/errors/AlwaysContinueProductionExceptionHandler.java diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.java b/streams/src/main/java/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.java index 33a95f4b4452b..0896114cf2866 100644 --- a/streams/src/main/java/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.java +++ b/streams/src/main/java/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.java @@ -25,12 +25,20 @@ * happens while attempting to produce result records. */ public class DefaultProductionExceptionHandler implements ProductionExceptionHandler { + @Deprecated @Override public ProductionExceptionHandlerResponse handle(final ProducerRecord record, final Exception exception) { return ProductionExceptionHandlerResponse.FAIL; } + @Override + public ProductionExceptionHandlerResponse handle(final ErrorHandlerContext context, + final ProducerRecord record, + final Exception exception) { + return ProductionExceptionHandlerResponse.FAIL; + } + @Override public void configure(final Map configs) { // ignore diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java b/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java index 6ae0170bfc906..8bf6068542b9b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java +++ b/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java @@ -30,9 +30,28 @@ public interface ProductionExceptionHandler extends Configurable { * * @param record The record that failed to produce * @param exception The exception that occurred during production + * @deprecated Use {@link #handle(ErrorHandlerContext, ProducerRecord, Exception)} instead */ - ProductionExceptionHandlerResponse handle(final ProducerRecord record, - final Exception exception); + @Deprecated + default ProductionExceptionHandlerResponse handle(final ProducerRecord record, + final Exception exception) { + throw new UnsupportedOperationException(); + } + + /** + * Inspect a record that we attempted to produce, and the exception that resulted + * from attempting to produce it and determine whether or not to continue processing. + * + * @param context The error handler context metadata + * @param record The record that failed to produce + * @param exception The exception that occurred during production + */ + @SuppressWarnings("deprecation") + default ProductionExceptionHandlerResponse handle(final ErrorHandlerContext context, + final ProducerRecord record, + final Exception exception) { + return handle(record, exception); + } /** * Handles serialization exception and determine if the process should continue. The default implementation is to @@ -40,12 +59,31 @@ ProductionExceptionHandlerResponse handle(final ProducerRecord r * * @param record the record that failed to serialize * @param exception the exception that occurred during serialization + * @deprecated Use {@link #handle(ErrorHandlerContext, ProducerRecord, Exception)} instead */ + @Deprecated default ProductionExceptionHandlerResponse handleSerializationException(final ProducerRecord record, final Exception exception) { return ProductionExceptionHandlerResponse.FAIL; } + /** + * Handles serialization exception and determine if the process should continue. The default implementation is to + * fail the process. + * + * @param context the error handler context metadata + * @param record the record that failed to serialize + * @param exception the exception that occurred during serialization + * @param origin the origin of the serialization exception + */ + @SuppressWarnings("deprecation") + default ProductionExceptionHandlerResponse handleSerializationException(final ErrorHandlerContext context, + final ProducerRecord record, + final Exception exception, + final SerializationExceptionOrigin origin) { + return handleSerializationException(record, exception); + } + enum ProductionExceptionHandlerResponse { /* continue processing */ CONTINUE(0, "CONTINUE"), @@ -68,4 +106,11 @@ enum ProductionExceptionHandlerResponse { this.name = name; } } + + enum SerializationExceptionOrigin { + /* serialization exception occurred during serialization of the key */ + KEY, + /* serialization exception occurred during serialization of the value */ + VALUE + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java index 16d67666ccb22..633a4b9de5788 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java @@ -44,6 +44,7 @@ import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.TaskCorruptedException; import org.apache.kafka.streams.errors.TaskMigratedException; +import org.apache.kafka.streams.errors.internals.DefaultErrorHandlerContext; import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; @@ -198,57 +199,51 @@ public void send(final String topic, final byte[] valBytes; try { keyBytes = keySerializer.serialize(topic, headers, key); - valBytes = valueSerializer.serialize(topic, headers, value); } catch (final ClassCastException exception) { - final String keyClass = key == null ? "unknown because key is null" : key.getClass().getName(); - final String valueClass = value == null ? "unknown because value is null" : value.getClass().getName(); - throw new StreamsException( - String.format( - "ClassCastException while producing data to topic %s. " + - "A serializer (key: %s / value: %s) is not compatible to the actual key or value type " + - "(key type: %s / value type: %s). " + - "Change the default Serdes in StreamConfig or provide correct Serdes via method parameters " + - "(for example if using the DSL, `#to(String topic, Produced produced)` with " + - "`Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).", - topic, - keySerializer.getClass().getName(), - valueSerializer.getClass().getName(), - keyClass, - valueClass), + throw createStreamsExceptionForClassCastException( + topic, + key, + value, + keySerializer, + valueSerializer, exception); } catch (final Exception exception) { - final ProducerRecord record = new ProducerRecord<>(topic, partition, timestamp, key, value, headers); - final ProductionExceptionHandler.ProductionExceptionHandlerResponse response; - - log.debug(String.format("Error serializing record to topic %s", topic), exception); - - try { - response = productionExceptionHandler.handleSerializationException(record, exception); - } catch (final Exception e) { - log.error("Fatal when handling serialization exception", e); - recordSendError(topic, e, null); - return; - } - - if (response == ProductionExceptionHandlerResponse.FAIL) { - throw new StreamsException( - String.format( - "Unable to serialize record. ProducerRecord(topic=[%s], partition=[%d], timestamp=[%d]", - topic, - partition, - timestamp), - exception - ); - } - - log.warn("Unable to serialize record, continue processing. " + - "ProducerRecord(topic=[{}], partition=[{}], timestamp=[{}])", - topic, - partition, - timestamp); - - droppedRecordsSensor.record(); + handleException( + ProductionExceptionHandler.SerializationExceptionOrigin.KEY, + topic, + key, + value, + headers, + partition, + timestamp, + processorNodeId, + context, + exception); + return; + } + try { + valBytes = valueSerializer.serialize(topic, headers, value); + } catch (final ClassCastException exception) { + throw createStreamsExceptionForClassCastException( + topic, + key, + value, + keySerializer, + valueSerializer, + exception); + } catch (final Exception exception) { + handleException( + ProductionExceptionHandler.SerializationExceptionOrigin.VALUE, + topic, + key, + value, + headers, + partition, + timestamp, + processorNodeId, + context, + exception); return; } @@ -285,7 +280,7 @@ public void send(final String topic, topicProducedSensor.record(bytesProduced, context.currentSystemTimeMs()); } } else { - recordSendError(topic, exception, serializedRecord); + recordSendError(topic, exception, serializedRecord, context, processorNodeId); // KAFKA-7510 only put message key and value in TRACE level log so we don't leak data by default log.trace("Failed record: (key {} value {} timestamp {}) topic=[{}] partition=[{}]", key, value, timestamp, topic, partition); @@ -293,7 +288,87 @@ public void send(final String topic, }); } - private void recordSendError(final String topic, final Exception exception, final ProducerRecord serializedRecord) { + private void handleException(final ProductionExceptionHandler.SerializationExceptionOrigin origin, + final String topic, + final K key, + final V value, + final Headers headers, + final Integer partition, + final Long timestamp, + final String processorNodeId, + final InternalProcessorContext context, + final Exception exception) { + final ProducerRecord record = new ProducerRecord<>(topic, partition, timestamp, key, value, headers); + final ProductionExceptionHandlerResponse response; + + log.debug(String.format("Error serializing record to topic %s", topic), exception); + + try { + final DefaultErrorHandlerContext errorHandlerContext = new DefaultErrorHandlerContext( + context.recordContext().topic(), + context.recordContext().partition(), + context.recordContext().offset(), + context.recordContext().headers(), + context.recordContext().rawRecord().key(), + context.recordContext().rawRecord().value(), + processorNodeId, + taskId); + response = productionExceptionHandler.handleSerializationException(errorHandlerContext, record, exception, origin); + } catch (final Exception e) { + log.error("Fatal when handling serialization exception", e); + recordSendError(topic, e, null, context, processorNodeId); + return; + } + + if (response == ProductionExceptionHandlerResponse.FAIL) { + throw new StreamsException( + String.format( + "Unable to serialize record. ProducerRecord(topic=[%s], partition=[%d], timestamp=[%d]", + topic, + partition, + timestamp), + exception + ); + } + + log.warn("Unable to serialize record, continue processing. " + + "ProducerRecord(topic=[{}], partition=[{}], timestamp=[{}])", + topic, + partition, + timestamp); + + droppedRecordsSensor.record(); + } + + private StreamsException createStreamsExceptionForClassCastException(final String topic, + final K key, + final V value, + final Serializer keySerializer, + final Serializer valueSerializer, + final ClassCastException exception) { + final String keyClass = key == null ? "unknown because key is null" : key.getClass().getName(); + final String valueClass = value == null ? "unknown because value is null" : value.getClass().getName(); + return new StreamsException( + String.format( + "ClassCastException while producing data to topic %s. " + + "A serializer (key: %s / value: %s) is not compatible to the actual key or value type " + + "(key type: %s / value type: %s). " + + "Change the default Serdes in StreamConfig or provide correct Serdes via method parameters " + + "(for example if using the DSL, `#to(String topic, Produced produced)` with " + + "`Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).", + topic, + keySerializer.getClass().getName(), + valueSerializer.getClass().getName(), + keyClass, + valueClass), + exception); + } + + private void recordSendError(final String topic, + final Exception exception, + final ProducerRecord serializedRecord, + final InternalProcessorContext context, + final String processorNodeId) { String errorMessage = String.format(SEND_EXCEPTION_MESSAGE, topic, taskId, exception.toString()); if (isFatalException(exception)) { @@ -314,7 +389,17 @@ private void recordSendError(final String topic, final Exception exception, fina "`delivery.timeout.ms` to a larger value to wait longer for such scenarios and avoid timeout errors"; sendException.set(new TaskCorruptedException(Collections.singleton(taskId))); } else { - if (productionExceptionHandler.handle(serializedRecord, exception) == ProductionExceptionHandlerResponse.FAIL) { + final DefaultErrorHandlerContext errorHandlerContext = new DefaultErrorHandlerContext( + context.recordContext().topic(), + context.recordContext().partition(), + context.recordContext().offset(), + context.recordContext().headers(), + context.recordContext().rawRecord().key(), + context.recordContext().rawRecord().value(), + processorNodeId, + taskId); + + if (productionExceptionHandler.handle(errorHandlerContext, serializedRecord, exception) == ProductionExceptionHandlerResponse.FAIL) { errorMessage += "\nException handler choose to FAIL the processing, no more records would be sent."; sendException.set(new StreamsException(errorMessage, exception)); } else { diff --git a/streams/src/test/java/org/apache/kafka/streams/errors/AlwaysContinueProductionExceptionHandler.java b/streams/src/test/java/org/apache/kafka/streams/errors/AlwaysContinueProductionExceptionHandler.java deleted file mode 100644 index be1e98e4a7154..0000000000000 --- a/streams/src/test/java/org/apache/kafka/streams/errors/AlwaysContinueProductionExceptionHandler.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.errors; - -import org.apache.kafka.clients.producer.ProducerRecord; - -import java.util.Map; - -/** - * Production exception handler that always instructs streams to continue when an exception - * happens while attempting to produce result records. - */ -public class AlwaysContinueProductionExceptionHandler implements ProductionExceptionHandler { - @Override - public ProductionExceptionHandlerResponse handle(final ProducerRecord record, - final Exception exception) { - return ProductionExceptionHandlerResponse.CONTINUE; - } - - @Override - public ProductionExceptionHandlerResponse handleSerializationException(final ProducerRecord record, - final Exception exception) { - return ProductionExceptionHandlerResponse.CONTINUE; - } - - @Override - public void configure(final Map configs) { - // ignore - } -} diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java index 5222759bc5ea1..f3175ddf08a9b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java @@ -48,8 +48,8 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.errors.AlwaysContinueProductionExceptionHandler; import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler; +import org.apache.kafka.streams.errors.ErrorHandlerContext; import org.apache.kafka.streams.errors.ProductionExceptionHandler; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.TaskMigratedException; @@ -93,9 +93,11 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.IsEqual.equalTo; import static org.hamcrest.core.IsInstanceOf.instanceOf; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; @@ -1171,11 +1173,11 @@ public void shouldThrowStreamsExceptionOnSubsequentSendIfASendFailsWithDefaultEx topology ); - collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, null, null, streamPartitioner); + collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, sinkNodeName, context, streamPartitioner); final StreamsException thrown = assertThrows( StreamsException.class, - () -> collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, null, null, streamPartitioner) + () -> collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, sinkNodeName, context, streamPartitioner) ); assertEquals(exception, thrown.getCause()); assertThat( @@ -1198,7 +1200,7 @@ public void shouldThrowStreamsExceptionOnSubsequentFlushIfASendFailsWithDefaultE topology ); - collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, null, null, streamPartitioner); + collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, sinkNodeName, context, streamPartitioner); final StreamsException thrown = assertThrows(StreamsException.class, collector::flush); assertEquals(exception, thrown.getCause()); @@ -1222,7 +1224,7 @@ public void shouldThrowStreamsExceptionOnSubsequentCloseIfASendFailsWithDefaultE topology ); - collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, null, null, streamPartitioner); + collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, sinkNodeName, context, streamPartitioner); final StreamsException thrown = assertThrows(StreamsException.class, collector::closeClean); assertEquals(exception, thrown.getCause()); @@ -1241,7 +1243,7 @@ public void shouldThrowStreamsExceptionOnSubsequentSendIfFatalEvenWithContinueEx logContext, taskId, getExceptionalStreamsProducerOnSend(exception), - new AlwaysContinueProductionExceptionHandler(), + new ProductionExceptionHandlerMock(ProductionExceptionHandler.ProductionExceptionHandlerResponse.CONTINUE), streamsMetrics, topology ); @@ -1268,7 +1270,7 @@ public void shouldThrowStreamsExceptionOnSubsequentFlushIfFatalEvenWithContinueE logContext, taskId, getExceptionalStreamsProducerOnSend(exception), - new AlwaysContinueProductionExceptionHandler(), + new ProductionExceptionHandlerMock(ProductionExceptionHandler.ProductionExceptionHandlerResponse.CONTINUE), streamsMetrics, topology ); @@ -1292,7 +1294,7 @@ public void shouldThrowStreamsExceptionOnSubsequentCloseIfFatalEvenWithContinueE logContext, taskId, getExceptionalStreamsProducerOnSend(exception), - new AlwaysContinueProductionExceptionHandler(), + new ProductionExceptionHandlerMock(ProductionExceptionHandler.ProductionExceptionHandlerResponse.CONTINUE), streamsMetrics, topology ); @@ -1314,8 +1316,13 @@ public void shouldNotThrowStreamsExceptionOnSubsequentCallIfASendFailsWithContin final RecordCollector collector = new RecordCollectorImpl( logContext, taskId, - getExceptionalStreamsProducerOnSend(new Exception()), - new AlwaysContinueProductionExceptionHandler(), + getExceptionalStreamsProducerOnSend(new RuntimeException("KABOOM!")), + new ProductionExceptionHandlerMock( + ProductionExceptionHandler.ProductionExceptionHandlerResponse.CONTINUE, + context, + sinkNodeName, + taskId + ), streamsMetrics, topology ); @@ -1324,7 +1331,7 @@ public void shouldNotThrowStreamsExceptionOnSubsequentCallIfASendFailsWithContin LogCaptureAppender.createAndRegister(RecordCollectorImpl.class)) { logCaptureAppender.setThreshold(Level.INFO); - collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, null, null, streamPartitioner); + collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, sinkNodeName, context, streamPartitioner); collector.flush(); final List messages = logCaptureAppender.getMessages(); @@ -1348,7 +1355,7 @@ public void shouldNotThrowStreamsExceptionOnSubsequentCallIfASendFailsWithContin )); assertEquals(1.0, metric.metricValue()); - collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, null, null, streamPartitioner); + collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, sinkNodeName, context, streamPartitioner); collector.flush(); collector.closeClean(); } @@ -1365,7 +1372,7 @@ public void shouldThrowStreamsExceptionOnUnknownTopicOrPartitionExceptionWithDef topology ); - collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, null, null, streamPartitioner); + collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, sinkNodeName, context, streamPartitioner); // With default handler which returns FAIL, flush() throws StreamsException with TimeoutException cause, // otherwise it would throw a TaskCorruptedException with null cause @@ -1386,12 +1393,17 @@ public void shouldNotThrowTaskCorruptedExceptionOnUnknownTopicOrPartitionExcepti logContext, taskId, getExceptionalStreamsProducerOnSend(exception), - new AlwaysContinueProductionExceptionHandler(), + new ProductionExceptionHandlerMock( + ProductionExceptionHandler.ProductionExceptionHandlerResponse.CONTINUE, + context, + sinkNodeName, + taskId + ), streamsMetrics, topology ); - collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, null, null, streamPartitioner); + collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, sinkNodeName, context, streamPartitioner); assertDoesNotThrow(collector::flush); } @@ -1527,7 +1539,13 @@ public void shouldThrowStreamsExceptionUsingDefaultExceptionHandler() { @Test public void shouldDropRecordExceptionUsingAlwaysContinueExceptionHandler() { try (final ErrorStringSerializer errorSerializer = new ErrorStringSerializer()) { - final RecordCollector collector = newRecordCollector(new AlwaysContinueProductionExceptionHandler()); + final RecordCollector collector = newRecordCollector(new ProductionExceptionHandlerMock( + ProductionExceptionHandler.ProductionExceptionHandlerResponse.CONTINUE, + context, + sinkNodeName, + taskId, + ProductionExceptionHandler.SerializationExceptionOrigin.KEY + )); collector.initialize(); collector.send(topic, "key", "val", null, 0, null, errorSerializer, stringSerializer, sinkNodeName, context); @@ -1547,11 +1565,32 @@ public void shouldDropRecordExceptionUsingAlwaysContinueExceptionHandler() { } } + @Test + public void shouldThrowStreamsExceptionWhenValueSerializationFailedAndProductionExceptionHandlerRepliesWithFail() { + try (final ErrorStringSerializer errorSerializer = new ErrorStringSerializer()) { + final RecordCollector collector = newRecordCollector(new ProductionExceptionHandlerMock( + ProductionExceptionHandler.ProductionExceptionHandlerResponse.FAIL, + context, + sinkNodeName, + taskId, + ProductionExceptionHandler.SerializationExceptionOrigin.VALUE + )); + collector.initialize(); + + final StreamsException exception = assertThrows( + StreamsException.class, + () -> collector.send(topic, "key", "val", null, 0, null, stringSerializer, errorSerializer, sinkNodeName, context)); + + assertInstanceOf(RuntimeException.class, exception.getCause()); + assertEquals("KABOOM!", exception.getCause().getMessage()); + } + } + @SuppressWarnings({"unchecked", "rawtypes"}) @Test public void shouldNotCallProductionExceptionHandlerOnClassCastException() { try (final ErrorStringSerializer errorSerializer = new ErrorStringSerializer()) { - final RecordCollector collector = newRecordCollector(new AlwaysContinueProductionExceptionHandler()); + final RecordCollector collector = newRecordCollector(new ProductionExceptionHandlerMock(ProductionExceptionHandler.ProductionExceptionHandlerResponse.CONTINUE)); collector.initialize(); assertThat(mockProducer.history().isEmpty(), equalTo(true)); @@ -1637,7 +1676,7 @@ private static class ErrorStringSerializer extends StringSerializer { @Override public byte[] serialize(final String topic, final Headers headers, final String data) { - throw new SerializationException("Not Supported"); + throw new SerializationException("KABOOM!"); } } @@ -1705,4 +1744,70 @@ public byte[] serialize(final String topic, final Headers headers, final String return serialize(topic, data); } } + + public static class ProductionExceptionHandlerMock implements ProductionExceptionHandler { + private final ProductionExceptionHandlerResponse response; + private InternalProcessorContext expectedContext; + private String expectedProcessorNodeId; + private TaskId expectedTaskId; + private SerializationExceptionOrigin expectedSerializationExceptionOrigin; + + public ProductionExceptionHandlerMock(final ProductionExceptionHandlerResponse response) { + this.response = response; + } + + public ProductionExceptionHandlerMock(final ProductionExceptionHandlerResponse response, + final InternalProcessorContext context, + final String processorNodeId, + final TaskId taskId) { + this(response); + this.expectedContext = context; + this.expectedProcessorNodeId = processorNodeId; + this.expectedTaskId = taskId; + } + + public ProductionExceptionHandlerMock(final ProductionExceptionHandlerResponse response, + final InternalProcessorContext context, + final String processorNodeId, + final TaskId taskId, + final SerializationExceptionOrigin origin) { + this(response, context, processorNodeId, taskId); + this.expectedSerializationExceptionOrigin = origin; + } + + @Override + public ProductionExceptionHandlerResponse handle(final ErrorHandlerContext context, + final ProducerRecord record, + final Exception exception) { + assertInputs(context, exception); + return response; + } + + @Override + public ProductionExceptionHandlerResponse handleSerializationException(final ErrorHandlerContext context, + final ProducerRecord record, + final Exception exception, + final SerializationExceptionOrigin origin) { + assertInputs(context, exception); + assertEquals(expectedSerializationExceptionOrigin, origin); + return response; + } + + @Override + public void configure(final Map configs) { + // do nothing + } + + private void assertInputs(final ErrorHandlerContext context, final Exception exception) { + assertEquals(expectedContext.recordContext().topic(), context.topic()); + assertEquals(expectedContext.recordContext().partition(), context.partition()); + assertEquals(expectedContext.recordContext().offset(), context.offset()); + assertArrayEquals(expectedContext.recordContext().rawRecord().key(), context.sourceRawKey()); + assertArrayEquals(expectedContext.recordContext().rawRecord().value(), context.sourceRawValue()); + assertEquals(expectedProcessorNodeId, context.processorNodeId()); + assertEquals(expectedTaskId, context.taskId()); + assertInstanceOf(RuntimeException.class, exception); + assertEquals("KABOOM!", exception.getMessage()); + } + } } diff --git a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java index 237d1c665b874..8d1f640580a5a 100644 --- a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java +++ b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java @@ -239,6 +239,14 @@ public InternalMockProcessorContext(final File stateDir, appConfigs(), IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED, false); + this.recordContext = new ProcessorRecordContext( + 0, + 0, + 0, + "topic", + new RecordHeaders(), + new ConsumerRecord<>("topic", 0, 0, new byte[0], new byte[0]) + ); } @Override From 94242d82b32bebeac786c6b10afa65f0f43e2909 Mon Sep 17 00:00:00 2001 From: Sebastien Viale Date: Wed, 24 Jul 2024 13:45:51 +0200 Subject: [PATCH 2/7] KAFKA-16448 Add Error Handler Context for Production Exception Handler Co-authored-by: Dabz Co-authored-by: loicgreffier --- .../internals/RecordCollectorTest.java | 23 ++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java index f3175ddf08a9b..95445536f82fa 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java @@ -1578,7 +1578,7 @@ public void shouldThrowStreamsExceptionWhenValueSerializationFailedAndProduction collector.initialize(); final StreamsException exception = assertThrows( - StreamsException.class, + StreamsException.class, () -> collector.send(topic, "key", "val", null, 0, null, stringSerializer, errorSerializer, sinkNodeName, context)); assertInstanceOf(RuntimeException.class, exception.getCause()); @@ -1586,6 +1586,27 @@ public void shouldThrowStreamsExceptionWhenValueSerializationFailedAndProduction } } + @Test + public void shouldThrowStreamsExceptionWhenKeySerializationFailedAndProductionExceptionHandlerRepliesWithFail() { + try (final ErrorStringSerializer errorSerializer = new ErrorStringSerializer()) { + final RecordCollector collector = newRecordCollector(new ProductionExceptionHandlerMock( + ProductionExceptionHandler.ProductionExceptionHandlerResponse.FAIL, + context, + sinkNodeName, + taskId, + ProductionExceptionHandler.SerializationExceptionOrigin.KEY + )); + collector.initialize(); + + final StreamsException exception = assertThrows( + StreamsException.class, + () -> collector.send(topic, "key", "val", null, 0, null, errorSerializer, stringSerializer, sinkNodeName, context)); + + assertInstanceOf(RuntimeException.class, exception.getCause()); + assertEquals("KABOOM!", exception.getCause().getMessage()); + } + } + @SuppressWarnings({"unchecked", "rawtypes"}) @Test public void shouldNotCallProductionExceptionHandlerOnClassCastException() { From 1fd0a6685691b287642777d6ffdf0c6d7a57b565 Mon Sep 17 00:00:00 2001 From: Sebastien Viale Date: Mon, 29 Jul 2024 09:58:48 +0200 Subject: [PATCH 3/7] KAFKA-16448 Add Error Handler Context for Production Exception Handler Co-authored-by: Dabz Co-authored-by: loicgreffier --- .../errors/ProductionExceptionHandler.java | 4 +- .../internals/RecordCollectorImpl.java | 51 +++++++++++-------- .../internals/RecordCollectorTest.java | 30 +++++------ 3 files changed, 46 insertions(+), 39 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java b/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java index 8bf6068542b9b..25aa00f7a9279 100644 --- a/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java +++ b/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java @@ -30,7 +30,7 @@ public interface ProductionExceptionHandler extends Configurable { * * @param record The record that failed to produce * @param exception The exception that occurred during production - * @deprecated Use {@link #handle(ErrorHandlerContext, ProducerRecord, Exception)} instead + * @deprecated Since 3.9. Use {@link #handle(ErrorHandlerContext, ProducerRecord, Exception)} instead. */ @Deprecated default ProductionExceptionHandlerResponse handle(final ProducerRecord record, @@ -59,7 +59,7 @@ default ProductionExceptionHandlerResponse handle(final ErrorHandlerContext cont * * @param record the record that failed to serialize * @param exception the exception that occurred during serialization - * @deprecated Use {@link #handle(ErrorHandlerContext, ProducerRecord, Exception)} instead + * @deprecated Since 3.9. Use {@link #handle(ErrorHandlerContext, ProducerRecord, Exception)} instead. */ @Deprecated default ProductionExceptionHandlerResponse handleSerializationException(final ProducerRecord record, diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java index 633a4b9de5788..f8c757f256ac1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java @@ -200,12 +200,10 @@ public void send(final String topic, try { keyBytes = keySerializer.serialize(topic, headers, key); } catch (final ClassCastException exception) { - throw createStreamsExceptionForClassCastException( + throw createStreamsExceptionForKeyClassCastException( topic, key, - value, keySerializer, - valueSerializer, exception); } catch (final Exception exception) { handleException( @@ -225,11 +223,9 @@ public void send(final String topic, try { valBytes = valueSerializer.serialize(topic, headers, value); } catch (final ClassCastException exception) { - throw createStreamsExceptionForClassCastException( + throw createStreamsExceptionForValueClassCastException( topic, - key, value, - keySerializer, valueSerializer, exception); } catch (final Exception exception) { @@ -339,28 +335,39 @@ private void handleException(final ProductionExceptionHandler.Serializati droppedRecordsSensor.record(); } - - private StreamsException createStreamsExceptionForClassCastException(final String topic, + private StreamsException createStreamsExceptionForKeyClassCastException(final String topic, final K key, - final V value, final Serializer keySerializer, - final Serializer valueSerializer, final ClassCastException exception) { final String keyClass = key == null ? "unknown because key is null" : key.getClass().getName(); + return new StreamsException( + String.format( + "ClassCastException while producing data to topic %s. " + + "The key serializer %s is not compatible to the actual key type: %s. " + + "Change the default key serde in StreamConfig or provide the correct key serde via method parameters " + + "(for example if using the DSL, `#to(String topic, Produced produced)` with " + + "`Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).", + topic, + keySerializer.getClass().getName(), + keyClass), + exception); + } + + private StreamsException createStreamsExceptionForValueClassCastException(final String topic, + final V value, + final Serializer valueSerializer, + final ClassCastException exception) { final String valueClass = value == null ? "unknown because value is null" : value.getClass().getName(); return new StreamsException( - String.format( - "ClassCastException while producing data to topic %s. " + - "A serializer (key: %s / value: %s) is not compatible to the actual key or value type " + - "(key type: %s / value type: %s). " + - "Change the default Serdes in StreamConfig or provide correct Serdes via method parameters " + - "(for example if using the DSL, `#to(String topic, Produced produced)` with " + - "`Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).", - topic, - keySerializer.getClass().getName(), - valueSerializer.getClass().getName(), - keyClass, - valueClass), + String.format( + "ClassCastException while producing data to topic %s. " + + "The value serializer %s is not compatible to the actual value type: %s. " + + "Change the default value serde in StreamConfig or provide the correct value serde via method parameters " + + "(for example if using the DSL, `#to(String topic, Produced produced)` with " + + "`Produced.valueSerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).", + topic, + valueSerializer.getClass().getName(), + valueClass), exception); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java index 95445536f82fa..f61e04934a4e4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java @@ -920,11 +920,11 @@ public void shouldThrowInformativeStreamsExceptionOnKeyClassCastException() { expected.getMessage(), equalTo( "ClassCastException while producing data to topic topic. " + - "A serializer (key: org.apache.kafka.common.serialization.LongSerializer / value: org.apache.kafka.common.serialization.StringSerializer) " + - "is not compatible to the actual key or value type (key type: java.lang.String / value type: java.lang.String). " + - "Change the default Serdes in StreamConfig or provide correct Serdes via method parameters " + + "The key serializer org.apache.kafka.common.serialization.LongSerializer " + + "is not compatible to the actual key type: java.lang.String. " + + "Change the default key serde in StreamConfig or provide the correct key serde via method parameters " + "(for example if using the DSL, `#to(String topic, Produced produced)` with `Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).") - ); + ); } @SuppressWarnings({"unchecked", "rawtypes"}) @@ -948,9 +948,9 @@ public void shouldThrowInformativeStreamsExceptionOnKeyAndNullValueClassCastExce expected.getMessage(), equalTo( "ClassCastException while producing data to topic topic. " + - "A serializer (key: org.apache.kafka.common.serialization.LongSerializer / value: org.apache.kafka.common.serialization.StringSerializer) " + - "is not compatible to the actual key or value type (key type: java.lang.String / value type: unknown because value is null). " + - "Change the default Serdes in StreamConfig or provide correct Serdes via method parameters " + + "The key serializer org.apache.kafka.common.serialization.LongSerializer " + + "is not compatible to the actual key type: java.lang.String. " + + "Change the default key serde in StreamConfig or provide the correct key serde via method parameters " + "(for example if using the DSL, `#to(String topic, Produced produced)` with `Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).") ); } @@ -976,10 +976,10 @@ public void shouldThrowInformativeStreamsExceptionOnValueClassCastException() { expected.getMessage(), equalTo( "ClassCastException while producing data to topic topic. " + - "A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.common.serialization.LongSerializer) " + - "is not compatible to the actual key or value type (key type: java.lang.String / value type: java.lang.String). " + - "Change the default Serdes in StreamConfig or provide correct Serdes via method parameters " + - "(for example if using the DSL, `#to(String topic, Produced produced)` with `Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).") + "The value serializer org.apache.kafka.common.serialization.LongSerializer " + + "is not compatible to the actual value type: java.lang.String. " + + "Change the default value serde in StreamConfig or provide the correct value serde via method parameters " + + "(for example if using the DSL, `#to(String topic, Produced produced)` with `Produced.valueSerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).") ); } @@ -1004,10 +1004,10 @@ public void shouldThrowInformativeStreamsExceptionOnValueAndNullKeyClassCastExce expected.getMessage(), equalTo( "ClassCastException while producing data to topic topic. " + - "A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.common.serialization.LongSerializer) " + - "is not compatible to the actual key or value type (key type: unknown because key is null / value type: java.lang.String). " + - "Change the default Serdes in StreamConfig or provide correct Serdes via method parameters " + - "(for example if using the DSL, `#to(String topic, Produced produced)` with `Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).") + "The value serializer org.apache.kafka.common.serialization.LongSerializer " + + "is not compatible to the actual value type: java.lang.String. " + + "Change the default value serde in StreamConfig or provide the correct value serde via method parameters " + + "(for example if using the DSL, `#to(String topic, Produced produced)` with `Produced.valueSerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).") ); } From 057464fc12a3973eada68a94750359c02f6d7bf5 Mon Sep 17 00:00:00 2001 From: Sebastien Viale Date: Mon, 29 Jul 2024 10:15:12 +0200 Subject: [PATCH 4/7] KAFKA-16448 Add Error Handler Context for Production Exception Handler Co-authored-by: Dabz Co-authored-by: loicgreffier --- .../kafka/streams/processor/internals/RecordCollectorTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java index f61e04934a4e4..4cb92f396ca7d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java @@ -924,7 +924,7 @@ public void shouldThrowInformativeStreamsExceptionOnKeyClassCastException() { "is not compatible to the actual key type: java.lang.String. " + "Change the default key serde in StreamConfig or provide the correct key serde via method parameters " + "(for example if using the DSL, `#to(String topic, Produced produced)` with `Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).") - ); + ); } @SuppressWarnings({"unchecked", "rawtypes"}) From 1efb3b27848d0b054f636080fc0fc821ed1abac2 Mon Sep 17 00:00:00 2001 From: Sebastien Viale Date: Mon, 29 Jul 2024 10:24:41 +0200 Subject: [PATCH 5/7] KAFKA-16448 Add Error Handler Context for Production Exception Handler remove rawKey and rawValue Co-authored-by: Dabz Co-authored-by: loicgreffier --- .../streams/errors/ErrorHandlerContext.java | 28 ------------------- .../internals/DefaultErrorHandlerContext.java | 16 ----------- .../processor/internals/ProcessorNode.java | 2 -- .../internals/RecordCollectorImpl.java | 4 --- ...essingExceptionHandlerIntegrationTest.java | 2 -- .../internals/ProcessorNodeTest.java | 3 -- .../internals/RecordCollectorTest.java | 3 -- 7 files changed, 58 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/ErrorHandlerContext.java b/streams/src/main/java/org/apache/kafka/streams/errors/ErrorHandlerContext.java index 0c50547549027..6c5e4f19596c7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/errors/ErrorHandlerContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/errors/ErrorHandlerContext.java @@ -89,34 +89,6 @@ public interface ErrorHandlerContext { */ Headers headers(); - /** - * Return the non-deserialized byte[] of the input message key if the context has been triggered by a message. - * - *

If this method is invoked within a {@link Punctuator#punctuate(long) - * punctuation callback}, or while processing a record that was forwarded by a punctuation - * callback, it will return {@code null}. - * - *

If this method is invoked in a sub-topology due to a repartition, the returned key would be one sent - * to the repartition topic. - * - * @return the raw byte of the key of the source message - */ - byte[] sourceRawKey(); - - /** - * Return the non-deserialized byte[] of the input message value if the context has been triggered by a message. - * - *

If this method is invoked within a {@link Punctuator#punctuate(long) - * punctuation callback}, or while processing a record that was forwarded by a punctuation - * callback, it will return {@code null}. - * - *

If this method is invoked in a sub-topology due to a repartition, the returned value would be one sent - * to the repartition topic. - * - * @return the raw byte of the value of the source message - */ - byte[] sourceRawValue(); - /** * Return the current processor node ID. * diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/internals/DefaultErrorHandlerContext.java b/streams/src/main/java/org/apache/kafka/streams/errors/internals/DefaultErrorHandlerContext.java index ff79860d77e30..aa066fb6da96c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/errors/internals/DefaultErrorHandlerContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/errors/internals/DefaultErrorHandlerContext.java @@ -28,8 +28,6 @@ public class DefaultErrorHandlerContext implements ErrorHandlerContext { private final int partition; private final long offset; private final Headers headers; - private final byte[] sourceRawKey; - private final byte[] sourceRawValue; private final String processorNodeId; private final TaskId taskId; @@ -37,16 +35,12 @@ public DefaultErrorHandlerContext(final String topic, final int partition, final long offset, final Headers headers, - final byte[] sourceRawKey, - final byte[] sourceRawValue, final String processorNodeId, final TaskId taskId) { this.topic = topic; this.partition = partition; this.offset = offset; this.headers = headers; - this.sourceRawKey = sourceRawKey; - this.sourceRawValue = sourceRawValue; this.processorNodeId = processorNodeId; this.taskId = taskId; } @@ -71,16 +65,6 @@ public Headers headers() { return headers; } - @Override - public byte[] sourceRawKey() { - return sourceRawKey; - } - - @Override - public byte[] sourceRawValue() { - return sourceRawValue; - } - @Override public String processorNodeId() { return processorNodeId; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java index eabf9e3d5c471..c30d42ba7461d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java @@ -209,8 +209,6 @@ public void process(final Record record) { internalProcessorContext.partition(), internalProcessorContext.offset(), internalProcessorContext.headers(), - internalProcessorContext.recordContext().rawRecord().key(), - internalProcessorContext.recordContext().rawRecord().value(), internalProcessorContext.currentNode().name(), internalProcessorContext.taskId()); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java index f8c757f256ac1..35097153a5fc6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java @@ -305,8 +305,6 @@ private void handleException(final ProductionExceptionHandler.Serializati context.recordContext().partition(), context.recordContext().offset(), context.recordContext().headers(), - context.recordContext().rawRecord().key(), - context.recordContext().rawRecord().value(), processorNodeId, taskId); response = productionExceptionHandler.handleSerializationException(errorHandlerContext, record, exception, origin); @@ -401,8 +399,6 @@ private void recordSendError(final String topic, context.recordContext().partition(), context.recordContext().offset(), context.recordContext().headers(), - context.recordContext().rawRecord().key(), - context.recordContext().rawRecord().value(), processorNodeId, taskId); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java index bb29ac64f02e0..6c1a64344ec76 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java @@ -176,8 +176,6 @@ public void configure(final Map configs) { } private static void assertProcessingExceptionHandlerInputs(final ErrorHandlerContext context, final Record record, final Exception exception) { - assertTrue(Arrays.asList("ID123-2-ERR", "ID123-5-ERR").contains(new String(context.sourceRawKey()))); - assertTrue(Arrays.asList("ID123-A2", "ID123-A5").contains(new String(context.sourceRawValue()))); assertTrue(Arrays.asList("ID123-2-ERR", "ID123-5-ERR").contains((String) record.key())); assertTrue(Arrays.asList("ID123-A2", "ID123-A5").contains((String) record.value())); assertEquals("TOPIC_NAME", context.topic()); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java index 6be033cb2dc08..b8d0d2a21bc4d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java @@ -58,7 +58,6 @@ import java.util.Set; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ROLLUP_VALUE; -import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -337,8 +336,6 @@ public ProcessingExceptionHandler.ProcessingHandlerResponse handle(final ErrorHa assertEquals(internalProcessorContext.topic(), context.topic()); assertEquals(internalProcessorContext.partition(), context.partition()); assertEquals(internalProcessorContext.offset(), context.offset()); - assertArrayEquals(internalProcessorContext.recordContext().rawRecord().key(), context.sourceRawKey()); - assertArrayEquals(internalProcessorContext.recordContext().rawRecord().value(), context.sourceRawValue()); assertEquals(internalProcessorContext.currentNode().name(), context.processorNodeId()); assertEquals(internalProcessorContext.taskId(), context.taskId()); assertEquals(KEY, record.key()); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java index 4cb92f396ca7d..2623102885197 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java @@ -93,7 +93,6 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.IsEqual.equalTo; import static org.hamcrest.core.IsInstanceOf.instanceOf; -import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -1823,8 +1822,6 @@ private void assertInputs(final ErrorHandlerContext context, final Exception exc assertEquals(expectedContext.recordContext().topic(), context.topic()); assertEquals(expectedContext.recordContext().partition(), context.partition()); assertEquals(expectedContext.recordContext().offset(), context.offset()); - assertArrayEquals(expectedContext.recordContext().rawRecord().key(), context.sourceRawKey()); - assertArrayEquals(expectedContext.recordContext().rawRecord().value(), context.sourceRawValue()); assertEquals(expectedProcessorNodeId, context.processorNodeId()); assertEquals(expectedTaskId, context.taskId()); assertInstanceOf(RuntimeException.class, exception); From 419155c38f511554702ffe6ee8c72d49753619ce Mon Sep 17 00:00:00 2001 From: Sebastien Viale Date: Mon, 29 Jul 2024 15:54:40 +0200 Subject: [PATCH 6/7] KAFKA-16448 Add Error Handler Context for Production Exception Handler remove rawKey and rawValue Co-authored-by: Dabz Co-authored-by: loicgreffier --- .../internals/GlobalStateManagerImpl.java | 3 +-- .../internals/GlobalStateUpdateTask.java | 3 +-- .../processor/internals/ProcessorAdapter.java | 3 +-- .../internals/ProcessorContextImpl.java | 3 +-- .../internals/ProcessorRecordContext.java | 18 +----------------- .../streams/processor/internals/SinkNode.java | 3 +-- .../processor/internals/StreamTask.java | 6 ++---- .../processor/internals/ProcessorNodeTest.java | 4 +--- .../test/InternalMockProcessorContext.java | 3 +-- 9 files changed, 10 insertions(+), 36 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java index b1263ddc58df6..6b7214a9ed185 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java @@ -307,8 +307,7 @@ private void reprocessState(final List topicPartitions, record.offset(), record.partition(), record.topic(), - record.headers(), - record); + record.headers()); globalProcessorContext.setRecordContext(recordContext); try { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java index 1713efb52a9bd..12a6beedbcd98 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java @@ -113,8 +113,7 @@ public void update(final ConsumerRecord record) { deserialized.offset(), deserialized.partition(), deserialized.topic(), - deserialized.headers(), - record); + deserialized.headers()); processorContext.setRecordContext(recordContext); processorContext.setCurrentNode(sourceNodeAndDeserializer.sourceNode()); final Record toProcess = new Record<>( diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java index a5d88f5a7f8a9..79db3847cfb06 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java @@ -66,8 +66,7 @@ public void process(final Record record) { context.offset(), context.partition(), context.topic(), - record.headers(), - processorRecordContext.rawRecord() + record.headers() )); delegate.process(record.key(), record.value()); } finally { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java index 6a79434b622ab..b484d26f0fe87 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java @@ -261,8 +261,7 @@ public void forward(final Record record, final String childName) { recordContext.offset(), recordContext.partition(), recordContext.topic(), - record.headers(), - recordContext.rawRecord()); + record.headers()); } if (childName == null) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java index 3d1ce0529e678..839baaad87528 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams.processor.internals; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeader; @@ -38,28 +37,17 @@ public class ProcessorRecordContext implements RecordContext, RecordMetadata { private final String topic; private final int partition; private final Headers headers; - private final ConsumerRecord rawRecord; public ProcessorRecordContext(final long timestamp, final long offset, final int partition, final String topic, final Headers headers) { - this(timestamp, offset, partition, topic, headers, null); - } - - public ProcessorRecordContext(final long timestamp, - final long offset, - final int partition, - final String topic, - final Headers headers, - final ConsumerRecord rawRecord) { this.timestamp = timestamp; this.offset = offset; this.topic = topic; this.partition = partition; this.headers = Objects.requireNonNull(headers); - this.rawRecord = rawRecord; } @Override @@ -87,10 +75,6 @@ public Headers headers() { return headers; } - public ConsumerRecord rawRecord() { - return rawRecord; - } - public long residentMemorySizeEstimate() { long size = 0; size += Long.BYTES; // value.context.timestamp @@ -189,7 +173,7 @@ public static ProcessorRecordContext deserialize(final ByteBuffer buffer) { headers = new RecordHeaders(headerArr); } - return new ProcessorRecordContext(timestamp, offset, partition, topic, headers, null); + return new ProcessorRecordContext(timestamp, offset, partition, topic, headers); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java index 871cb2284ee4d..6e79616d30a9c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java @@ -88,8 +88,7 @@ public void process(final Record record) { context.offset(), context.partition(), context.topic(), - record.headers(), - context.recordContext().rawRecord() + record.headers() ); final String topic = topicExtractor.extract(key, value, contextForExtraction); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 30b9038aa6a67..8cbe5780b90bd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -844,8 +844,7 @@ private void doProcess(final long wallClockTime) { record.offset(), record.partition(), record.topic(), - record.headers(), - record.rawRecord() + record.headers() ); updateProcessorContext(currNode, wallClockTime, recordContext); @@ -906,8 +905,7 @@ public void punctuate(final ProcessorNode node, -1L, -1, null, - new RecordHeaders(), - null + new RecordHeaders() ); updateProcessorContext(node, time.milliseconds(), recordContext); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java index b8d0d2a21bc4d..df3f927686362 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams.processor.internals; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.InvalidOffsetException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.header.internals.RecordHeaders; @@ -314,8 +313,7 @@ private InternalProcessorContext mockInternalProcessorContext() OFFSET, PARTITION, TOPIC, - new RecordHeaders(), - new ConsumerRecord<>(TOPIC, PARTITION, OFFSET, KEY.getBytes(), VALUE.getBytes()))); + new RecordHeaders())); when(internalProcessorContext.currentNode()).thenReturn(new ProcessorNode<>(NAME)); return internalProcessorContext; diff --git a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java index 8d1f640580a5a..ff9080a2de166 100644 --- a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java +++ b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java @@ -244,8 +244,7 @@ public InternalMockProcessorContext(final File stateDir, 0, 0, "topic", - new RecordHeaders(), - new ConsumerRecord<>("topic", 0, 0, new byte[0], new byte[0]) + new RecordHeaders() ); } From 902ae9835040b78405c0826862780dd0ad13ad7b Mon Sep 17 00:00:00 2001 From: Sebastien Viale Date: Mon, 29 Jul 2024 16:04:40 +0200 Subject: [PATCH 7/7] KAFKA-16448 Add Error Handler Context for Production Exception Handler remove rawKey and rawValue Co-authored-by: Dabz Co-authored-by: loicgreffier --- .../processor/internals/CorruptedRecord.java | 2 +- .../processor/internals/RecordQueue.java | 2 +- .../processor/internals/StampedRecord.java | 18 +----------------- 3 files changed, 3 insertions(+), 19 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/CorruptedRecord.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/CorruptedRecord.java index 1bc8cb51092c6..d31a29883cabe 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/CorruptedRecord.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/CorruptedRecord.java @@ -28,7 +28,7 @@ public class CorruptedRecord extends StampedRecord { CorruptedRecord(final ConsumerRecord rawRecord) { - super(rawRecord, ConsumerRecord.NO_TIMESTAMP, rawRecord); + super(rawRecord, ConsumerRecord.NO_TIMESTAMP); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java index a3c9ea67f067d..a6b30a07ef96d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java @@ -230,7 +230,7 @@ private void updateHead() { droppedRecordsSensor.record(); continue; } - headRecord = new StampedRecord(deserialized, timestamp, raw); + headRecord = new StampedRecord(deserialized, timestamp); headRecordSizeInBytes = consumerRecordSizeInBytes(raw); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StampedRecord.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StampedRecord.java index d82cd98ed7eec..71e3ca2e3ceca 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StampedRecord.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StampedRecord.java @@ -20,11 +20,9 @@ import org.apache.kafka.common.header.Headers; public class StampedRecord extends Stamped> { - private final ConsumerRecord rawRecord; - public StampedRecord(final ConsumerRecord record, final long timestamp, final ConsumerRecord rawRecord) { + public StampedRecord(final ConsumerRecord record, final long timestamp) { super(record, timestamp); - this.rawRecord = rawRecord; } public String topic() { @@ -51,20 +49,6 @@ public Headers headers() { return value.headers(); } - public ConsumerRecord rawRecord() { - return rawRecord; - } - - @Override - public boolean equals(final Object other) { - return super.equals(other); - } - - @Override - public int hashCode() { - return super.hashCode(); - } - @Override public String toString() { return value.toString() + ", timestamp = " + timestamp;