diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/DeadLetterQueueIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/DeadLetterQueueIntegrationTest.java index 90811edf37f63..3d7f91e4fc7e5 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/DeadLetterQueueIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/DeadLetterQueueIntegrationTest.java @@ -140,7 +140,7 @@ public void shouldSendToDlqAndFailFromDsl() throws Exception { assertEquals("key", new String(dlqRecords.get(0).key()), "Output record should be sent to DLQ topic"); assertEquals("KABOOM", new String(dlqRecords.get(0).value()), "Output record should be sent to DLQ topic"); - assertEquals("java.lang.RuntimeException: KABOOM", new String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_EXCEPTION_NAME).value())); + assertEquals("java.lang.RuntimeException", new String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_EXCEPTION_NAME).value())); assertEquals("KABOOM", new String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_EXCEPTION_MESSAGE_NAME).value())); assertTrue(new String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_STACKTRACE_NAME).value()).contains("org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process")); assertEquals(INPUT_TOPIC, new String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_TOPIC_NAME).value())); @@ -181,7 +181,7 @@ public void shouldSendToDlqAndContinueFromDsl() throws Exception { assertEquals("key", new String(dlqRecords.get(0).key()), "Output record should be sent to DLQ topic"); assertEquals("KABOOM", new String(dlqRecords.get(0).value()), "Output record should be sent to DLQ topic"); - assertEquals("java.lang.RuntimeException: KABOOM", new String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_EXCEPTION_NAME).value())); + assertEquals("java.lang.RuntimeException", new String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_EXCEPTION_NAME).value())); assertEquals("KABOOM", new String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_EXCEPTION_MESSAGE_NAME).value())); assertTrue(new String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_STACKTRACE_NAME).value()).contains("org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process")); assertEquals(INPUT_TOPIC, new String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_TOPIC_NAME).value())); @@ -220,7 +220,7 @@ public void shouldSendToDlqAndFailFromProcessorAPI() throws Exception { assertEquals("key", new String(dlqRecords.get(0).key()), "Output record should be sent to DLQ topic"); assertEquals("KABOOM", new String(dlqRecords.get(0).value()), "Output record should be sent to DLQ topic"); - assertEquals("java.lang.RuntimeException: KABOOM", new String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_EXCEPTION_NAME).value())); + assertEquals("java.lang.RuntimeException", new String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_EXCEPTION_NAME).value())); assertEquals("KABOOM", new String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_EXCEPTION_MESSAGE_NAME).value())); assertTrue(new String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_STACKTRACE_NAME).value()).contains("org.apache.kafka.streams.integration.DeadLetterQueueIntegrationTest$1.process")); assertEquals(INPUT_TOPIC, new String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_TOPIC_NAME).value())); @@ -261,7 +261,7 @@ public void shouldSendToDlqAndContinueFromProcessorAPI() throws Exception { assertEquals("key", new String(dlqRecords.get(0).key()), "Output record should be sent to DLQ topic"); assertEquals("KABOOM", new String(dlqRecords.get(0).value()), "Output record should be sent to DLQ topic"); - assertEquals("java.lang.RuntimeException: KABOOM", new String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_EXCEPTION_NAME).value())); + assertEquals("java.lang.RuntimeException", new String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_EXCEPTION_NAME).value())); assertEquals("KABOOM", new String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_EXCEPTION_MESSAGE_NAME).value())); assertTrue(new String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_STACKTRACE_NAME).value()).contains("org.apache.kafka.streams.integration.DeadLetterQueueIntegrationTest$1.process")); assertEquals(INPUT_TOPIC, new String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_TOPIC_NAME).value())); @@ -303,7 +303,7 @@ public void shouldSendToDlqAndFailFromDeserializationError() throws Exception { assertEquals("key", new String(dlqRecords.get(0).key()), "Output record should be sent to DLQ topic"); assertEquals("value", new String(dlqRecords.get(0).value()), "Output record should be sent to DLQ topic"); - assertEquals("org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8", new String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_EXCEPTION_NAME).value())); + assertEquals("org.apache.kafka.common.errors.SerializationException", new String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_EXCEPTION_NAME).value())); assertEquals("Size of data received by LongDeserializer is not 8", new String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_EXCEPTION_MESSAGE_NAME).value())); assertTrue(new String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_STACKTRACE_NAME).value()).contains("org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8")); assertEquals(INPUT_TOPIC, new String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_TOPIC_NAME).value())); @@ -344,7 +344,7 @@ public void shouldSendToDlqAndContinueFromDeserializationError() throws Exceptio assertEquals("key", new String(dlqRecords.get(0).key()), "Output record should be sent to DLQ topic"); assertEquals("value", new String(dlqRecords.get(0).value()), "Output record should be sent to DLQ topic"); - assertEquals("org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8", new String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_EXCEPTION_NAME).value())); + assertEquals("org.apache.kafka.common.errors.SerializationException", new String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_EXCEPTION_NAME).value())); assertEquals("Size of data received by LongDeserializer is not 8", new String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_EXCEPTION_MESSAGE_NAME).value())); assertTrue(new String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_STACKTRACE_NAME).value()).contains("org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8")); assertEquals(INPUT_TOPIC, new String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_TOPIC_NAME).value())); diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/internals/ExceptionHandlerUtils.java b/streams/src/main/java/org/apache/kafka/streams/errors/internals/ExceptionHandlerUtils.java index d3fd221cea8a5..10b708cb74cbe 100644 --- a/streams/src/main/java/org/apache/kafka/streams/errors/internals/ExceptionHandlerUtils.java +++ b/streams/src/main/java/org/apache/kafka/streams/errors/internals/ExceptionHandlerUtils.java @@ -19,6 +19,7 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.errors.InvalidConfigurationException; +import org.apache.kafka.common.header.Header; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.ErrorHandlerContext; @@ -83,12 +84,18 @@ public static ProducerRecord buildDeadLetterQueueRecord(final St throw new InvalidConfigurationException(String.format("%s cannot be null while building dead letter queue record", StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG)); } final ProducerRecord producerRecord = new ProducerRecord<>(deadLetterQueueTopicName, null, context.timestamp(), key, value); + // Copy original headers from record that causes exception + if (context.headers() != null) { + for (final Header header : context.headers()) { + producerRecord.headers().add(header); + } + } final StringWriter stackTraceStringWriter = new StringWriter(); final PrintWriter stackTracePrintWriter = new PrintWriter(stackTraceStringWriter); e.printStackTrace(stackTracePrintWriter); try (final StringSerializer stringSerializer = new StringSerializer()) { - producerRecord.headers().add(HEADER_ERRORS_EXCEPTION_NAME, stringSerializer.serialize(null, e.toString())); + producerRecord.headers().add(HEADER_ERRORS_EXCEPTION_NAME, stringSerializer.serialize(null, e.getClass().getName())); producerRecord.headers().add(HEADER_ERRORS_EXCEPTION_MESSAGE_NAME, stringSerializer.serialize(null, e.getMessage())); producerRecord.headers().add(HEADER_ERRORS_STACKTRACE_NAME, stringSerializer.serialize(null, stackTraceStringWriter.toString())); producerRecord.headers().add(HEADER_ERRORS_TOPIC_NAME, stringSerializer.serialize(null, context.topic())); diff --git a/streams/src/test/java/org/apache/kafka/streams/errors/ExceptionHandlerUtilsTest.java b/streams/src/test/java/org/apache/kafka/streams/errors/ExceptionHandlerUtilsTest.java index 915f3a3f650bb..bf2bc547c54be 100644 --- a/streams/src/test/java/org/apache/kafka/streams/errors/ExceptionHandlerUtilsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/errors/ExceptionHandlerUtilsTest.java @@ -82,11 +82,15 @@ public void checkDeadLetterQueueRecords() { assertEquals(1, dlqRecord.timestamp()); assertEquals(key, new String(dlqRecord.key())); assertEquals(value, new String(dlqRecord.value())); - assertEquals(exception.toString(), stringDeserializer.deserialize(null, headers.lastHeader(ExceptionHandlerUtils.HEADER_ERRORS_EXCEPTION_NAME).value())); + assertEquals(exception.getClass().getName(), stringDeserializer.deserialize(null, headers.lastHeader(ExceptionHandlerUtils.HEADER_ERRORS_EXCEPTION_NAME).value())); assertEquals(exception.getMessage(), stringDeserializer.deserialize(null, headers.lastHeader(ExceptionHandlerUtils.HEADER_ERRORS_EXCEPTION_MESSAGE_NAME).value())); assertEquals("source", stringDeserializer.deserialize(null, headers.lastHeader(ExceptionHandlerUtils.HEADER_ERRORS_TOPIC_NAME).value())); assertEquals("3", stringDeserializer.deserialize(null, headers.lastHeader(ExceptionHandlerUtils.HEADER_ERRORS_PARTITION_NAME).value())); assertEquals("2", stringDeserializer.deserialize(null, headers.lastHeader(ExceptionHandlerUtils.HEADER_ERRORS_OFFSET_NAME).value())); + // Verify original source headers are preserved + assertEquals("hello world", + stringDeserializer.deserialize(null, + headers.lastHeader("sourceHeader").value())); } @Test