diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.java b/streams/src/main/java/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.java index 33a95f4b4452b..0896114cf2866 100644 --- a/streams/src/main/java/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.java +++ b/streams/src/main/java/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.java @@ -25,12 +25,20 @@ * happens while attempting to produce result records. */ public class DefaultProductionExceptionHandler implements ProductionExceptionHandler { + @Deprecated @Override public ProductionExceptionHandlerResponse handle(final ProducerRecord record, final Exception exception) { return ProductionExceptionHandlerResponse.FAIL; } + @Override + public ProductionExceptionHandlerResponse handle(final ErrorHandlerContext context, + final ProducerRecord record, + final Exception exception) { + return ProductionExceptionHandlerResponse.FAIL; + } + @Override public void configure(final Map configs) { // ignore diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/ErrorHandlerContext.java b/streams/src/main/java/org/apache/kafka/streams/errors/ErrorHandlerContext.java index 0c50547549027..6c5e4f19596c7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/errors/ErrorHandlerContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/errors/ErrorHandlerContext.java @@ -89,34 +89,6 @@ public interface ErrorHandlerContext { */ Headers headers(); - /** - * Return the non-deserialized byte[] of the input message key if the context has been triggered by a message. - * - *

If this method is invoked within a {@link Punctuator#punctuate(long) - * punctuation callback}, or while processing a record that was forwarded by a punctuation - * callback, it will return {@code null}. - * - *

If this method is invoked in a sub-topology due to a repartition, the returned key would be one sent - * to the repartition topic. - * - * @return the raw byte of the key of the source message - */ - byte[] sourceRawKey(); - - /** - * Return the non-deserialized byte[] of the input message value if the context has been triggered by a message. - * - *

If this method is invoked within a {@link Punctuator#punctuate(long) - * punctuation callback}, or while processing a record that was forwarded by a punctuation - * callback, it will return {@code null}. - * - *

If this method is invoked in a sub-topology due to a repartition, the returned value would be one sent - * to the repartition topic. - * - * @return the raw byte of the value of the source message - */ - byte[] sourceRawValue(); - /** * Return the current processor node ID. * diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java b/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java index 6ae0170bfc906..25aa00f7a9279 100644 --- a/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java +++ b/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java @@ -30,9 +30,28 @@ public interface ProductionExceptionHandler extends Configurable { * * @param record The record that failed to produce * @param exception The exception that occurred during production + * @deprecated Since 3.9. Use {@link #handle(ErrorHandlerContext, ProducerRecord, Exception)} instead. */ - ProductionExceptionHandlerResponse handle(final ProducerRecord record, - final Exception exception); + @Deprecated + default ProductionExceptionHandlerResponse handle(final ProducerRecord record, + final Exception exception) { + throw new UnsupportedOperationException(); + } + + /** + * Inspect a record that we attempted to produce, and the exception that resulted + * from attempting to produce it and determine whether or not to continue processing. + * + * @param context The error handler context metadata + * @param record The record that failed to produce + * @param exception The exception that occurred during production + */ + @SuppressWarnings("deprecation") + default ProductionExceptionHandlerResponse handle(final ErrorHandlerContext context, + final ProducerRecord record, + final Exception exception) { + return handle(record, exception); + } /** * Handles serialization exception and determine if the process should continue. The default implementation is to @@ -40,12 +59,31 @@ ProductionExceptionHandlerResponse handle(final ProducerRecord r * * @param record the record that failed to serialize * @param exception the exception that occurred during serialization + * @deprecated Since 3.9. Use {@link #handle(ErrorHandlerContext, ProducerRecord, Exception)} instead. */ + @Deprecated default ProductionExceptionHandlerResponse handleSerializationException(final ProducerRecord record, final Exception exception) { return ProductionExceptionHandlerResponse.FAIL; } + /** + * Handles serialization exception and determine if the process should continue. The default implementation is to + * fail the process. + * + * @param context the error handler context metadata + * @param record the record that failed to serialize + * @param exception the exception that occurred during serialization + * @param origin the origin of the serialization exception + */ + @SuppressWarnings("deprecation") + default ProductionExceptionHandlerResponse handleSerializationException(final ErrorHandlerContext context, + final ProducerRecord record, + final Exception exception, + final SerializationExceptionOrigin origin) { + return handleSerializationException(record, exception); + } + enum ProductionExceptionHandlerResponse { /* continue processing */ CONTINUE(0, "CONTINUE"), @@ -68,4 +106,11 @@ enum ProductionExceptionHandlerResponse { this.name = name; } } + + enum SerializationExceptionOrigin { + /* serialization exception occurred during serialization of the key */ + KEY, + /* serialization exception occurred during serialization of the value */ + VALUE + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/internals/DefaultErrorHandlerContext.java b/streams/src/main/java/org/apache/kafka/streams/errors/internals/DefaultErrorHandlerContext.java index ff79860d77e30..aa066fb6da96c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/errors/internals/DefaultErrorHandlerContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/errors/internals/DefaultErrorHandlerContext.java @@ -28,8 +28,6 @@ public class DefaultErrorHandlerContext implements ErrorHandlerContext { private final int partition; private final long offset; private final Headers headers; - private final byte[] sourceRawKey; - private final byte[] sourceRawValue; private final String processorNodeId; private final TaskId taskId; @@ -37,16 +35,12 @@ public DefaultErrorHandlerContext(final String topic, final int partition, final long offset, final Headers headers, - final byte[] sourceRawKey, - final byte[] sourceRawValue, final String processorNodeId, final TaskId taskId) { this.topic = topic; this.partition = partition; this.offset = offset; this.headers = headers; - this.sourceRawKey = sourceRawKey; - this.sourceRawValue = sourceRawValue; this.processorNodeId = processorNodeId; this.taskId = taskId; } @@ -71,16 +65,6 @@ public Headers headers() { return headers; } - @Override - public byte[] sourceRawKey() { - return sourceRawKey; - } - - @Override - public byte[] sourceRawValue() { - return sourceRawValue; - } - @Override public String processorNodeId() { return processorNodeId; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/CorruptedRecord.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/CorruptedRecord.java index 1bc8cb51092c6..d31a29883cabe 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/CorruptedRecord.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/CorruptedRecord.java @@ -28,7 +28,7 @@ public class CorruptedRecord extends StampedRecord { CorruptedRecord(final ConsumerRecord rawRecord) { - super(rawRecord, ConsumerRecord.NO_TIMESTAMP, rawRecord); + super(rawRecord, ConsumerRecord.NO_TIMESTAMP); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java index b1263ddc58df6..6b7214a9ed185 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java @@ -307,8 +307,7 @@ private void reprocessState(final List topicPartitions, record.offset(), record.partition(), record.topic(), - record.headers(), - record); + record.headers()); globalProcessorContext.setRecordContext(recordContext); try { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java index 1713efb52a9bd..12a6beedbcd98 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java @@ -113,8 +113,7 @@ public void update(final ConsumerRecord record) { deserialized.offset(), deserialized.partition(), deserialized.topic(), - deserialized.headers(), - record); + deserialized.headers()); processorContext.setRecordContext(recordContext); processorContext.setCurrentNode(sourceNodeAndDeserializer.sourceNode()); final Record toProcess = new Record<>( diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java index a5d88f5a7f8a9..79db3847cfb06 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java @@ -66,8 +66,7 @@ public void process(final Record record) { context.offset(), context.partition(), context.topic(), - record.headers(), - processorRecordContext.rawRecord() + record.headers() )); delegate.process(record.key(), record.value()); } finally { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java index 6a79434b622ab..b484d26f0fe87 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java @@ -261,8 +261,7 @@ public void forward(final Record record, final String childName) { recordContext.offset(), recordContext.partition(), recordContext.topic(), - record.headers(), - recordContext.rawRecord()); + record.headers()); } if (childName == null) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java index eabf9e3d5c471..c30d42ba7461d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java @@ -209,8 +209,6 @@ public void process(final Record record) { internalProcessorContext.partition(), internalProcessorContext.offset(), internalProcessorContext.headers(), - internalProcessorContext.recordContext().rawRecord().key(), - internalProcessorContext.recordContext().rawRecord().value(), internalProcessorContext.currentNode().name(), internalProcessorContext.taskId()); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java index 3d1ce0529e678..839baaad87528 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams.processor.internals; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeader; @@ -38,28 +37,17 @@ public class ProcessorRecordContext implements RecordContext, RecordMetadata { private final String topic; private final int partition; private final Headers headers; - private final ConsumerRecord rawRecord; public ProcessorRecordContext(final long timestamp, final long offset, final int partition, final String topic, final Headers headers) { - this(timestamp, offset, partition, topic, headers, null); - } - - public ProcessorRecordContext(final long timestamp, - final long offset, - final int partition, - final String topic, - final Headers headers, - final ConsumerRecord rawRecord) { this.timestamp = timestamp; this.offset = offset; this.topic = topic; this.partition = partition; this.headers = Objects.requireNonNull(headers); - this.rawRecord = rawRecord; } @Override @@ -87,10 +75,6 @@ public Headers headers() { return headers; } - public ConsumerRecord rawRecord() { - return rawRecord; - } - public long residentMemorySizeEstimate() { long size = 0; size += Long.BYTES; // value.context.timestamp @@ -189,7 +173,7 @@ public static ProcessorRecordContext deserialize(final ByteBuffer buffer) { headers = new RecordHeaders(headerArr); } - return new ProcessorRecordContext(timestamp, offset, partition, topic, headers, null); + return new ProcessorRecordContext(timestamp, offset, partition, topic, headers); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java index 16d67666ccb22..35097153a5fc6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java @@ -44,6 +44,7 @@ import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.TaskCorruptedException; import org.apache.kafka.streams.errors.TaskMigratedException; +import org.apache.kafka.streams.errors.internals.DefaultErrorHandlerContext; import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; @@ -198,57 +199,47 @@ public void send(final String topic, final byte[] valBytes; try { keyBytes = keySerializer.serialize(topic, headers, key); - valBytes = valueSerializer.serialize(topic, headers, value); } catch (final ClassCastException exception) { - final String keyClass = key == null ? "unknown because key is null" : key.getClass().getName(); - final String valueClass = value == null ? "unknown because value is null" : value.getClass().getName(); - throw new StreamsException( - String.format( - "ClassCastException while producing data to topic %s. " + - "A serializer (key: %s / value: %s) is not compatible to the actual key or value type " + - "(key type: %s / value type: %s). " + - "Change the default Serdes in StreamConfig or provide correct Serdes via method parameters " + - "(for example if using the DSL, `#to(String topic, Produced produced)` with " + - "`Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).", - topic, - keySerializer.getClass().getName(), - valueSerializer.getClass().getName(), - keyClass, - valueClass), + throw createStreamsExceptionForKeyClassCastException( + topic, + key, + keySerializer, exception); } catch (final Exception exception) { - final ProducerRecord record = new ProducerRecord<>(topic, partition, timestamp, key, value, headers); - final ProductionExceptionHandler.ProductionExceptionHandlerResponse response; - - log.debug(String.format("Error serializing record to topic %s", topic), exception); - - try { - response = productionExceptionHandler.handleSerializationException(record, exception); - } catch (final Exception e) { - log.error("Fatal when handling serialization exception", e); - recordSendError(topic, e, null); - return; - } - - if (response == ProductionExceptionHandlerResponse.FAIL) { - throw new StreamsException( - String.format( - "Unable to serialize record. ProducerRecord(topic=[%s], partition=[%d], timestamp=[%d]", - topic, - partition, - timestamp), - exception - ); - } - - log.warn("Unable to serialize record, continue processing. " + - "ProducerRecord(topic=[{}], partition=[{}], timestamp=[{}])", - topic, - partition, - timestamp); - - droppedRecordsSensor.record(); + handleException( + ProductionExceptionHandler.SerializationExceptionOrigin.KEY, + topic, + key, + value, + headers, + partition, + timestamp, + processorNodeId, + context, + exception); + return; + } + try { + valBytes = valueSerializer.serialize(topic, headers, value); + } catch (final ClassCastException exception) { + throw createStreamsExceptionForValueClassCastException( + topic, + value, + valueSerializer, + exception); + } catch (final Exception exception) { + handleException( + ProductionExceptionHandler.SerializationExceptionOrigin.VALUE, + topic, + key, + value, + headers, + partition, + timestamp, + processorNodeId, + context, + exception); return; } @@ -285,7 +276,7 @@ public void send(final String topic, topicProducedSensor.record(bytesProduced, context.currentSystemTimeMs()); } } else { - recordSendError(topic, exception, serializedRecord); + recordSendError(topic, exception, serializedRecord, context, processorNodeId); // KAFKA-7510 only put message key and value in TRACE level log so we don't leak data by default log.trace("Failed record: (key {} value {} timestamp {}) topic=[{}] partition=[{}]", key, value, timestamp, topic, partition); @@ -293,7 +284,96 @@ public void send(final String topic, }); } - private void recordSendError(final String topic, final Exception exception, final ProducerRecord serializedRecord) { + private void handleException(final ProductionExceptionHandler.SerializationExceptionOrigin origin, + final String topic, + final K key, + final V value, + final Headers headers, + final Integer partition, + final Long timestamp, + final String processorNodeId, + final InternalProcessorContext context, + final Exception exception) { + final ProducerRecord record = new ProducerRecord<>(topic, partition, timestamp, key, value, headers); + final ProductionExceptionHandlerResponse response; + + log.debug(String.format("Error serializing record to topic %s", topic), exception); + + try { + final DefaultErrorHandlerContext errorHandlerContext = new DefaultErrorHandlerContext( + context.recordContext().topic(), + context.recordContext().partition(), + context.recordContext().offset(), + context.recordContext().headers(), + processorNodeId, + taskId); + response = productionExceptionHandler.handleSerializationException(errorHandlerContext, record, exception, origin); + } catch (final Exception e) { + log.error("Fatal when handling serialization exception", e); + recordSendError(topic, e, null, context, processorNodeId); + return; + } + + if (response == ProductionExceptionHandlerResponse.FAIL) { + throw new StreamsException( + String.format( + "Unable to serialize record. ProducerRecord(topic=[%s], partition=[%d], timestamp=[%d]", + topic, + partition, + timestamp), + exception + ); + } + + log.warn("Unable to serialize record, continue processing. " + + "ProducerRecord(topic=[{}], partition=[{}], timestamp=[{}])", + topic, + partition, + timestamp); + + droppedRecordsSensor.record(); + } + private StreamsException createStreamsExceptionForKeyClassCastException(final String topic, + final K key, + final Serializer keySerializer, + final ClassCastException exception) { + final String keyClass = key == null ? "unknown because key is null" : key.getClass().getName(); + return new StreamsException( + String.format( + "ClassCastException while producing data to topic %s. " + + "The key serializer %s is not compatible to the actual key type: %s. " + + "Change the default key serde in StreamConfig or provide the correct key serde via method parameters " + + "(for example if using the DSL, `#to(String topic, Produced produced)` with " + + "`Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).", + topic, + keySerializer.getClass().getName(), + keyClass), + exception); + } + + private StreamsException createStreamsExceptionForValueClassCastException(final String topic, + final V value, + final Serializer valueSerializer, + final ClassCastException exception) { + final String valueClass = value == null ? "unknown because value is null" : value.getClass().getName(); + return new StreamsException( + String.format( + "ClassCastException while producing data to topic %s. " + + "The value serializer %s is not compatible to the actual value type: %s. " + + "Change the default value serde in StreamConfig or provide the correct value serde via method parameters " + + "(for example if using the DSL, `#to(String topic, Produced produced)` with " + + "`Produced.valueSerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).", + topic, + valueSerializer.getClass().getName(), + valueClass), + exception); + } + + private void recordSendError(final String topic, + final Exception exception, + final ProducerRecord serializedRecord, + final InternalProcessorContext context, + final String processorNodeId) { String errorMessage = String.format(SEND_EXCEPTION_MESSAGE, topic, taskId, exception.toString()); if (isFatalException(exception)) { @@ -314,7 +394,15 @@ private void recordSendError(final String topic, final Exception exception, fina "`delivery.timeout.ms` to a larger value to wait longer for such scenarios and avoid timeout errors"; sendException.set(new TaskCorruptedException(Collections.singleton(taskId))); } else { - if (productionExceptionHandler.handle(serializedRecord, exception) == ProductionExceptionHandlerResponse.FAIL) { + final DefaultErrorHandlerContext errorHandlerContext = new DefaultErrorHandlerContext( + context.recordContext().topic(), + context.recordContext().partition(), + context.recordContext().offset(), + context.recordContext().headers(), + processorNodeId, + taskId); + + if (productionExceptionHandler.handle(errorHandlerContext, serializedRecord, exception) == ProductionExceptionHandlerResponse.FAIL) { errorMessage += "\nException handler choose to FAIL the processing, no more records would be sent."; sendException.set(new StreamsException(errorMessage, exception)); } else { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java index a3c9ea67f067d..a6b30a07ef96d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java @@ -230,7 +230,7 @@ private void updateHead() { droppedRecordsSensor.record(); continue; } - headRecord = new StampedRecord(deserialized, timestamp, raw); + headRecord = new StampedRecord(deserialized, timestamp); headRecordSizeInBytes = consumerRecordSizeInBytes(raw); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java index 871cb2284ee4d..6e79616d30a9c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java @@ -88,8 +88,7 @@ public void process(final Record record) { context.offset(), context.partition(), context.topic(), - record.headers(), - context.recordContext().rawRecord() + record.headers() ); final String topic = topicExtractor.extract(key, value, contextForExtraction); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StampedRecord.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StampedRecord.java index d82cd98ed7eec..71e3ca2e3ceca 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StampedRecord.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StampedRecord.java @@ -20,11 +20,9 @@ import org.apache.kafka.common.header.Headers; public class StampedRecord extends Stamped> { - private final ConsumerRecord rawRecord; - public StampedRecord(final ConsumerRecord record, final long timestamp, final ConsumerRecord rawRecord) { + public StampedRecord(final ConsumerRecord record, final long timestamp) { super(record, timestamp); - this.rawRecord = rawRecord; } public String topic() { @@ -51,20 +49,6 @@ public Headers headers() { return value.headers(); } - public ConsumerRecord rawRecord() { - return rawRecord; - } - - @Override - public boolean equals(final Object other) { - return super.equals(other); - } - - @Override - public int hashCode() { - return super.hashCode(); - } - @Override public String toString() { return value.toString() + ", timestamp = " + timestamp; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 30b9038aa6a67..8cbe5780b90bd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -844,8 +844,7 @@ private void doProcess(final long wallClockTime) { record.offset(), record.partition(), record.topic(), - record.headers(), - record.rawRecord() + record.headers() ); updateProcessorContext(currNode, wallClockTime, recordContext); @@ -906,8 +905,7 @@ public void punctuate(final ProcessorNode node, -1L, -1, null, - new RecordHeaders(), - null + new RecordHeaders() ); updateProcessorContext(node, time.milliseconds(), recordContext); diff --git a/streams/src/test/java/org/apache/kafka/streams/errors/AlwaysContinueProductionExceptionHandler.java b/streams/src/test/java/org/apache/kafka/streams/errors/AlwaysContinueProductionExceptionHandler.java deleted file mode 100644 index be1e98e4a7154..0000000000000 --- a/streams/src/test/java/org/apache/kafka/streams/errors/AlwaysContinueProductionExceptionHandler.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.errors; - -import org.apache.kafka.clients.producer.ProducerRecord; - -import java.util.Map; - -/** - * Production exception handler that always instructs streams to continue when an exception - * happens while attempting to produce result records. - */ -public class AlwaysContinueProductionExceptionHandler implements ProductionExceptionHandler { - @Override - public ProductionExceptionHandlerResponse handle(final ProducerRecord record, - final Exception exception) { - return ProductionExceptionHandlerResponse.CONTINUE; - } - - @Override - public ProductionExceptionHandlerResponse handleSerializationException(final ProducerRecord record, - final Exception exception) { - return ProductionExceptionHandlerResponse.CONTINUE; - } - - @Override - public void configure(final Map configs) { - // ignore - } -} diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java index bb29ac64f02e0..6c1a64344ec76 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java @@ -176,8 +176,6 @@ public void configure(final Map configs) { } private static void assertProcessingExceptionHandlerInputs(final ErrorHandlerContext context, final Record record, final Exception exception) { - assertTrue(Arrays.asList("ID123-2-ERR", "ID123-5-ERR").contains(new String(context.sourceRawKey()))); - assertTrue(Arrays.asList("ID123-A2", "ID123-A5").contains(new String(context.sourceRawValue()))); assertTrue(Arrays.asList("ID123-2-ERR", "ID123-5-ERR").contains((String) record.key())); assertTrue(Arrays.asList("ID123-A2", "ID123-A5").contains((String) record.value())); assertEquals("TOPIC_NAME", context.topic()); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java index 6be033cb2dc08..df3f927686362 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams.processor.internals; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.InvalidOffsetException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.header.internals.RecordHeaders; @@ -58,7 +57,6 @@ import java.util.Set; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ROLLUP_VALUE; -import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -315,8 +313,7 @@ private InternalProcessorContext mockInternalProcessorContext() OFFSET, PARTITION, TOPIC, - new RecordHeaders(), - new ConsumerRecord<>(TOPIC, PARTITION, OFFSET, KEY.getBytes(), VALUE.getBytes()))); + new RecordHeaders())); when(internalProcessorContext.currentNode()).thenReturn(new ProcessorNode<>(NAME)); return internalProcessorContext; @@ -337,8 +334,6 @@ public ProcessingExceptionHandler.ProcessingHandlerResponse handle(final ErrorHa assertEquals(internalProcessorContext.topic(), context.topic()); assertEquals(internalProcessorContext.partition(), context.partition()); assertEquals(internalProcessorContext.offset(), context.offset()); - assertArrayEquals(internalProcessorContext.recordContext().rawRecord().key(), context.sourceRawKey()); - assertArrayEquals(internalProcessorContext.recordContext().rawRecord().value(), context.sourceRawValue()); assertEquals(internalProcessorContext.currentNode().name(), context.processorNodeId()); assertEquals(internalProcessorContext.taskId(), context.taskId()); assertEquals(KEY, record.key()); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java index 5222759bc5ea1..2623102885197 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java @@ -48,8 +48,8 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.errors.AlwaysContinueProductionExceptionHandler; import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler; +import org.apache.kafka.streams.errors.ErrorHandlerContext; import org.apache.kafka.streams.errors.ProductionExceptionHandler; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.TaskMigratedException; @@ -96,6 +96,7 @@ import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; @@ -918,9 +919,9 @@ public void shouldThrowInformativeStreamsExceptionOnKeyClassCastException() { expected.getMessage(), equalTo( "ClassCastException while producing data to topic topic. " + - "A serializer (key: org.apache.kafka.common.serialization.LongSerializer / value: org.apache.kafka.common.serialization.StringSerializer) " + - "is not compatible to the actual key or value type (key type: java.lang.String / value type: java.lang.String). " + - "Change the default Serdes in StreamConfig or provide correct Serdes via method parameters " + + "The key serializer org.apache.kafka.common.serialization.LongSerializer " + + "is not compatible to the actual key type: java.lang.String. " + + "Change the default key serde in StreamConfig or provide the correct key serde via method parameters " + "(for example if using the DSL, `#to(String topic, Produced produced)` with `Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).") ); } @@ -946,9 +947,9 @@ public void shouldThrowInformativeStreamsExceptionOnKeyAndNullValueClassCastExce expected.getMessage(), equalTo( "ClassCastException while producing data to topic topic. " + - "A serializer (key: org.apache.kafka.common.serialization.LongSerializer / value: org.apache.kafka.common.serialization.StringSerializer) " + - "is not compatible to the actual key or value type (key type: java.lang.String / value type: unknown because value is null). " + - "Change the default Serdes in StreamConfig or provide correct Serdes via method parameters " + + "The key serializer org.apache.kafka.common.serialization.LongSerializer " + + "is not compatible to the actual key type: java.lang.String. " + + "Change the default key serde in StreamConfig or provide the correct key serde via method parameters " + "(for example if using the DSL, `#to(String topic, Produced produced)` with `Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).") ); } @@ -974,10 +975,10 @@ public void shouldThrowInformativeStreamsExceptionOnValueClassCastException() { expected.getMessage(), equalTo( "ClassCastException while producing data to topic topic. " + - "A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.common.serialization.LongSerializer) " + - "is not compatible to the actual key or value type (key type: java.lang.String / value type: java.lang.String). " + - "Change the default Serdes in StreamConfig or provide correct Serdes via method parameters " + - "(for example if using the DSL, `#to(String topic, Produced produced)` with `Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).") + "The value serializer org.apache.kafka.common.serialization.LongSerializer " + + "is not compatible to the actual value type: java.lang.String. " + + "Change the default value serde in StreamConfig or provide the correct value serde via method parameters " + + "(for example if using the DSL, `#to(String topic, Produced produced)` with `Produced.valueSerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).") ); } @@ -1002,10 +1003,10 @@ public void shouldThrowInformativeStreamsExceptionOnValueAndNullKeyClassCastExce expected.getMessage(), equalTo( "ClassCastException while producing data to topic topic. " + - "A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.common.serialization.LongSerializer) " + - "is not compatible to the actual key or value type (key type: unknown because key is null / value type: java.lang.String). " + - "Change the default Serdes in StreamConfig or provide correct Serdes via method parameters " + - "(for example if using the DSL, `#to(String topic, Produced produced)` with `Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).") + "The value serializer org.apache.kafka.common.serialization.LongSerializer " + + "is not compatible to the actual value type: java.lang.String. " + + "Change the default value serde in StreamConfig or provide the correct value serde via method parameters " + + "(for example if using the DSL, `#to(String topic, Produced produced)` with `Produced.valueSerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).") ); } @@ -1171,11 +1172,11 @@ public void shouldThrowStreamsExceptionOnSubsequentSendIfASendFailsWithDefaultEx topology ); - collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, null, null, streamPartitioner); + collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, sinkNodeName, context, streamPartitioner); final StreamsException thrown = assertThrows( StreamsException.class, - () -> collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, null, null, streamPartitioner) + () -> collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, sinkNodeName, context, streamPartitioner) ); assertEquals(exception, thrown.getCause()); assertThat( @@ -1198,7 +1199,7 @@ public void shouldThrowStreamsExceptionOnSubsequentFlushIfASendFailsWithDefaultE topology ); - collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, null, null, streamPartitioner); + collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, sinkNodeName, context, streamPartitioner); final StreamsException thrown = assertThrows(StreamsException.class, collector::flush); assertEquals(exception, thrown.getCause()); @@ -1222,7 +1223,7 @@ public void shouldThrowStreamsExceptionOnSubsequentCloseIfASendFailsWithDefaultE topology ); - collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, null, null, streamPartitioner); + collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, sinkNodeName, context, streamPartitioner); final StreamsException thrown = assertThrows(StreamsException.class, collector::closeClean); assertEquals(exception, thrown.getCause()); @@ -1241,7 +1242,7 @@ public void shouldThrowStreamsExceptionOnSubsequentSendIfFatalEvenWithContinueEx logContext, taskId, getExceptionalStreamsProducerOnSend(exception), - new AlwaysContinueProductionExceptionHandler(), + new ProductionExceptionHandlerMock(ProductionExceptionHandler.ProductionExceptionHandlerResponse.CONTINUE), streamsMetrics, topology ); @@ -1268,7 +1269,7 @@ public void shouldThrowStreamsExceptionOnSubsequentFlushIfFatalEvenWithContinueE logContext, taskId, getExceptionalStreamsProducerOnSend(exception), - new AlwaysContinueProductionExceptionHandler(), + new ProductionExceptionHandlerMock(ProductionExceptionHandler.ProductionExceptionHandlerResponse.CONTINUE), streamsMetrics, topology ); @@ -1292,7 +1293,7 @@ public void shouldThrowStreamsExceptionOnSubsequentCloseIfFatalEvenWithContinueE logContext, taskId, getExceptionalStreamsProducerOnSend(exception), - new AlwaysContinueProductionExceptionHandler(), + new ProductionExceptionHandlerMock(ProductionExceptionHandler.ProductionExceptionHandlerResponse.CONTINUE), streamsMetrics, topology ); @@ -1314,8 +1315,13 @@ public void shouldNotThrowStreamsExceptionOnSubsequentCallIfASendFailsWithContin final RecordCollector collector = new RecordCollectorImpl( logContext, taskId, - getExceptionalStreamsProducerOnSend(new Exception()), - new AlwaysContinueProductionExceptionHandler(), + getExceptionalStreamsProducerOnSend(new RuntimeException("KABOOM!")), + new ProductionExceptionHandlerMock( + ProductionExceptionHandler.ProductionExceptionHandlerResponse.CONTINUE, + context, + sinkNodeName, + taskId + ), streamsMetrics, topology ); @@ -1324,7 +1330,7 @@ public void shouldNotThrowStreamsExceptionOnSubsequentCallIfASendFailsWithContin LogCaptureAppender.createAndRegister(RecordCollectorImpl.class)) { logCaptureAppender.setThreshold(Level.INFO); - collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, null, null, streamPartitioner); + collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, sinkNodeName, context, streamPartitioner); collector.flush(); final List messages = logCaptureAppender.getMessages(); @@ -1348,7 +1354,7 @@ public void shouldNotThrowStreamsExceptionOnSubsequentCallIfASendFailsWithContin )); assertEquals(1.0, metric.metricValue()); - collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, null, null, streamPartitioner); + collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, sinkNodeName, context, streamPartitioner); collector.flush(); collector.closeClean(); } @@ -1365,7 +1371,7 @@ public void shouldThrowStreamsExceptionOnUnknownTopicOrPartitionExceptionWithDef topology ); - collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, null, null, streamPartitioner); + collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, sinkNodeName, context, streamPartitioner); // With default handler which returns FAIL, flush() throws StreamsException with TimeoutException cause, // otherwise it would throw a TaskCorruptedException with null cause @@ -1386,12 +1392,17 @@ public void shouldNotThrowTaskCorruptedExceptionOnUnknownTopicOrPartitionExcepti logContext, taskId, getExceptionalStreamsProducerOnSend(exception), - new AlwaysContinueProductionExceptionHandler(), + new ProductionExceptionHandlerMock( + ProductionExceptionHandler.ProductionExceptionHandlerResponse.CONTINUE, + context, + sinkNodeName, + taskId + ), streamsMetrics, topology ); - collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, null, null, streamPartitioner); + collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, sinkNodeName, context, streamPartitioner); assertDoesNotThrow(collector::flush); } @@ -1527,7 +1538,13 @@ public void shouldThrowStreamsExceptionUsingDefaultExceptionHandler() { @Test public void shouldDropRecordExceptionUsingAlwaysContinueExceptionHandler() { try (final ErrorStringSerializer errorSerializer = new ErrorStringSerializer()) { - final RecordCollector collector = newRecordCollector(new AlwaysContinueProductionExceptionHandler()); + final RecordCollector collector = newRecordCollector(new ProductionExceptionHandlerMock( + ProductionExceptionHandler.ProductionExceptionHandlerResponse.CONTINUE, + context, + sinkNodeName, + taskId, + ProductionExceptionHandler.SerializationExceptionOrigin.KEY + )); collector.initialize(); collector.send(topic, "key", "val", null, 0, null, errorSerializer, stringSerializer, sinkNodeName, context); @@ -1547,11 +1564,53 @@ public void shouldDropRecordExceptionUsingAlwaysContinueExceptionHandler() { } } + @Test + public void shouldThrowStreamsExceptionWhenValueSerializationFailedAndProductionExceptionHandlerRepliesWithFail() { + try (final ErrorStringSerializer errorSerializer = new ErrorStringSerializer()) { + final RecordCollector collector = newRecordCollector(new ProductionExceptionHandlerMock( + ProductionExceptionHandler.ProductionExceptionHandlerResponse.FAIL, + context, + sinkNodeName, + taskId, + ProductionExceptionHandler.SerializationExceptionOrigin.VALUE + )); + collector.initialize(); + + final StreamsException exception = assertThrows( + StreamsException.class, + () -> collector.send(topic, "key", "val", null, 0, null, stringSerializer, errorSerializer, sinkNodeName, context)); + + assertInstanceOf(RuntimeException.class, exception.getCause()); + assertEquals("KABOOM!", exception.getCause().getMessage()); + } + } + + @Test + public void shouldThrowStreamsExceptionWhenKeySerializationFailedAndProductionExceptionHandlerRepliesWithFail() { + try (final ErrorStringSerializer errorSerializer = new ErrorStringSerializer()) { + final RecordCollector collector = newRecordCollector(new ProductionExceptionHandlerMock( + ProductionExceptionHandler.ProductionExceptionHandlerResponse.FAIL, + context, + sinkNodeName, + taskId, + ProductionExceptionHandler.SerializationExceptionOrigin.KEY + )); + collector.initialize(); + + final StreamsException exception = assertThrows( + StreamsException.class, + () -> collector.send(topic, "key", "val", null, 0, null, errorSerializer, stringSerializer, sinkNodeName, context)); + + assertInstanceOf(RuntimeException.class, exception.getCause()); + assertEquals("KABOOM!", exception.getCause().getMessage()); + } + } + @SuppressWarnings({"unchecked", "rawtypes"}) @Test public void shouldNotCallProductionExceptionHandlerOnClassCastException() { try (final ErrorStringSerializer errorSerializer = new ErrorStringSerializer()) { - final RecordCollector collector = newRecordCollector(new AlwaysContinueProductionExceptionHandler()); + final RecordCollector collector = newRecordCollector(new ProductionExceptionHandlerMock(ProductionExceptionHandler.ProductionExceptionHandlerResponse.CONTINUE)); collector.initialize(); assertThat(mockProducer.history().isEmpty(), equalTo(true)); @@ -1637,7 +1696,7 @@ private static class ErrorStringSerializer extends StringSerializer { @Override public byte[] serialize(final String topic, final Headers headers, final String data) { - throw new SerializationException("Not Supported"); + throw new SerializationException("KABOOM!"); } } @@ -1705,4 +1764,68 @@ public byte[] serialize(final String topic, final Headers headers, final String return serialize(topic, data); } } + + public static class ProductionExceptionHandlerMock implements ProductionExceptionHandler { + private final ProductionExceptionHandlerResponse response; + private InternalProcessorContext expectedContext; + private String expectedProcessorNodeId; + private TaskId expectedTaskId; + private SerializationExceptionOrigin expectedSerializationExceptionOrigin; + + public ProductionExceptionHandlerMock(final ProductionExceptionHandlerResponse response) { + this.response = response; + } + + public ProductionExceptionHandlerMock(final ProductionExceptionHandlerResponse response, + final InternalProcessorContext context, + final String processorNodeId, + final TaskId taskId) { + this(response); + this.expectedContext = context; + this.expectedProcessorNodeId = processorNodeId; + this.expectedTaskId = taskId; + } + + public ProductionExceptionHandlerMock(final ProductionExceptionHandlerResponse response, + final InternalProcessorContext context, + final String processorNodeId, + final TaskId taskId, + final SerializationExceptionOrigin origin) { + this(response, context, processorNodeId, taskId); + this.expectedSerializationExceptionOrigin = origin; + } + + @Override + public ProductionExceptionHandlerResponse handle(final ErrorHandlerContext context, + final ProducerRecord record, + final Exception exception) { + assertInputs(context, exception); + return response; + } + + @Override + public ProductionExceptionHandlerResponse handleSerializationException(final ErrorHandlerContext context, + final ProducerRecord record, + final Exception exception, + final SerializationExceptionOrigin origin) { + assertInputs(context, exception); + assertEquals(expectedSerializationExceptionOrigin, origin); + return response; + } + + @Override + public void configure(final Map configs) { + // do nothing + } + + private void assertInputs(final ErrorHandlerContext context, final Exception exception) { + assertEquals(expectedContext.recordContext().topic(), context.topic()); + assertEquals(expectedContext.recordContext().partition(), context.partition()); + assertEquals(expectedContext.recordContext().offset(), context.offset()); + assertEquals(expectedProcessorNodeId, context.processorNodeId()); + assertEquals(expectedTaskId, context.taskId()); + assertInstanceOf(RuntimeException.class, exception); + assertEquals("KABOOM!", exception.getMessage()); + } + } } diff --git a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java index 237d1c665b874..ff9080a2de166 100644 --- a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java +++ b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java @@ -239,6 +239,13 @@ public InternalMockProcessorContext(final File stateDir, appConfigs(), IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED, false); + this.recordContext = new ProcessorRecordContext( + 0, + 0, + 0, + "topic", + new RecordHeaders() + ); } @Override