Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public void shouldSendToDlqAndFailFromDsl() throws Exception {
assertEquals("key", new String(dlqRecords.get(0).key()), "Output record should be sent to DLQ topic");
assertEquals("KABOOM", new String(dlqRecords.get(0).value()), "Output record should be sent to DLQ topic");

assertEquals("java.lang.RuntimeException: KABOOM", new String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_EXCEPTION_NAME).value()));
assertEquals("java.lang.RuntimeException", new String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_EXCEPTION_NAME).value()));
assertEquals("KABOOM", new String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_EXCEPTION_MESSAGE_NAME).value()));
assertTrue(new String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_STACKTRACE_NAME).value()).contains("org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process"));
assertEquals(INPUT_TOPIC, new String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_TOPIC_NAME).value()));
Expand Down Expand Up @@ -181,7 +181,7 @@ public void shouldSendToDlqAndContinueFromDsl() throws Exception {
assertEquals("key", new String(dlqRecords.get(0).key()), "Output record should be sent to DLQ topic");
assertEquals("KABOOM", new String(dlqRecords.get(0).value()), "Output record should be sent to DLQ topic");

assertEquals("java.lang.RuntimeException: KABOOM", new String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_EXCEPTION_NAME).value()));
assertEquals("java.lang.RuntimeException", new String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_EXCEPTION_NAME).value()));
assertEquals("KABOOM", new String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_EXCEPTION_MESSAGE_NAME).value()));
assertTrue(new String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_STACKTRACE_NAME).value()).contains("org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process"));
assertEquals(INPUT_TOPIC, new String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_TOPIC_NAME).value()));
Expand Down Expand Up @@ -220,7 +220,7 @@ public void shouldSendToDlqAndFailFromProcessorAPI() throws Exception {
assertEquals("key", new String(dlqRecords.get(0).key()), "Output record should be sent to DLQ topic");
assertEquals("KABOOM", new String(dlqRecords.get(0).value()), "Output record should be sent to DLQ topic");

assertEquals("java.lang.RuntimeException: KABOOM", new String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_EXCEPTION_NAME).value()));
assertEquals("java.lang.RuntimeException", new String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_EXCEPTION_NAME).value()));
assertEquals("KABOOM", new String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_EXCEPTION_MESSAGE_NAME).value()));
assertTrue(new String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_STACKTRACE_NAME).value()).contains("org.apache.kafka.streams.integration.DeadLetterQueueIntegrationTest$1.process"));
assertEquals(INPUT_TOPIC, new String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_TOPIC_NAME).value()));
Expand Down Expand Up @@ -261,7 +261,7 @@ public void shouldSendToDlqAndContinueFromProcessorAPI() throws Exception {
assertEquals("key", new String(dlqRecords.get(0).key()), "Output record should be sent to DLQ topic");
assertEquals("KABOOM", new String(dlqRecords.get(0).value()), "Output record should be sent to DLQ topic");

assertEquals("java.lang.RuntimeException: KABOOM", new String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_EXCEPTION_NAME).value()));
assertEquals("java.lang.RuntimeException", new String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_EXCEPTION_NAME).value()));
assertEquals("KABOOM", new String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_EXCEPTION_MESSAGE_NAME).value()));
assertTrue(new String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_STACKTRACE_NAME).value()).contains("org.apache.kafka.streams.integration.DeadLetterQueueIntegrationTest$1.process"));
assertEquals(INPUT_TOPIC, new String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_TOPIC_NAME).value()));
Expand Down Expand Up @@ -303,7 +303,7 @@ public void shouldSendToDlqAndFailFromDeserializationError() throws Exception {
assertEquals("key", new String(dlqRecords.get(0).key()), "Output record should be sent to DLQ topic");
assertEquals("value", new String(dlqRecords.get(0).value()), "Output record should be sent to DLQ topic");

assertEquals("org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8", new String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_EXCEPTION_NAME).value()));
assertEquals("org.apache.kafka.common.errors.SerializationException", new String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_EXCEPTION_NAME).value()));
assertEquals("Size of data received by LongDeserializer is not 8", new String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_EXCEPTION_MESSAGE_NAME).value()));
assertTrue(new String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_STACKTRACE_NAME).value()).contains("org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8"));
assertEquals(INPUT_TOPIC, new String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_TOPIC_NAME).value()));
Expand Down Expand Up @@ -344,7 +344,7 @@ public void shouldSendToDlqAndContinueFromDeserializationError() throws Exceptio
assertEquals("key", new String(dlqRecords.get(0).key()), "Output record should be sent to DLQ topic");
assertEquals("value", new String(dlqRecords.get(0).value()), "Output record should be sent to DLQ topic");

assertEquals("org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8", new String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_EXCEPTION_NAME).value()));
assertEquals("org.apache.kafka.common.errors.SerializationException", new String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_EXCEPTION_NAME).value()));
assertEquals("Size of data received by LongDeserializer is not 8", new String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_EXCEPTION_MESSAGE_NAME).value()));
assertTrue(new String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_STACKTRACE_NAME).value()).contains("org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8"));
assertEquals(INPUT_TOPIC, new String(dlqRecords.get(0).headers().lastHeader(HEADER_ERRORS_TOPIC_NAME).value()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.ErrorHandlerContext;
Expand Down Expand Up @@ -83,12 +84,18 @@ public static ProducerRecord<byte[], byte[]> buildDeadLetterQueueRecord(final St
throw new InvalidConfigurationException(String.format("%s cannot be null while building dead letter queue record", StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG));
}
final ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(deadLetterQueueTopicName, null, context.timestamp(), key, value);
// Copy original headers from record that causes exception
if (context.headers() != null) {
for (final Header header : context.headers()) {
producerRecord.headers().add(header);
}
}
final StringWriter stackTraceStringWriter = new StringWriter();
final PrintWriter stackTracePrintWriter = new PrintWriter(stackTraceStringWriter);
e.printStackTrace(stackTracePrintWriter);

try (final StringSerializer stringSerializer = new StringSerializer()) {
producerRecord.headers().add(HEADER_ERRORS_EXCEPTION_NAME, stringSerializer.serialize(null, e.toString()));
producerRecord.headers().add(HEADER_ERRORS_EXCEPTION_NAME, stringSerializer.serialize(null, e.getClass().getName()));
producerRecord.headers().add(HEADER_ERRORS_EXCEPTION_MESSAGE_NAME, stringSerializer.serialize(null, e.getMessage()));
producerRecord.headers().add(HEADER_ERRORS_STACKTRACE_NAME, stringSerializer.serialize(null, stackTraceStringWriter.toString()));
producerRecord.headers().add(HEADER_ERRORS_TOPIC_NAME, stringSerializer.serialize(null, context.topic()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,15 @@ public void checkDeadLetterQueueRecords() {
assertEquals(1, dlqRecord.timestamp());
assertEquals(key, new String(dlqRecord.key()));
assertEquals(value, new String(dlqRecord.value()));
assertEquals(exception.toString(), stringDeserializer.deserialize(null, headers.lastHeader(ExceptionHandlerUtils.HEADER_ERRORS_EXCEPTION_NAME).value()));
assertEquals(exception.getClass().getName(), stringDeserializer.deserialize(null, headers.lastHeader(ExceptionHandlerUtils.HEADER_ERRORS_EXCEPTION_NAME).value()));
assertEquals(exception.getMessage(), stringDeserializer.deserialize(null, headers.lastHeader(ExceptionHandlerUtils.HEADER_ERRORS_EXCEPTION_MESSAGE_NAME).value()));
assertEquals("source", stringDeserializer.deserialize(null, headers.lastHeader(ExceptionHandlerUtils.HEADER_ERRORS_TOPIC_NAME).value()));
assertEquals("3", stringDeserializer.deserialize(null, headers.lastHeader(ExceptionHandlerUtils.HEADER_ERRORS_PARTITION_NAME).value()));
assertEquals("2", stringDeserializer.deserialize(null, headers.lastHeader(ExceptionHandlerUtils.HEADER_ERRORS_OFFSET_NAME).value()));
// Verify original source headers are preserved
assertEquals("hello world",
stringDeserializer.deserialize(null,
headers.lastHeader("sourceHeader").value()));
}

@Test
Expand Down