From 2be7af80c18cf9c9885dca6f5963114aa0aba0b6 Mon Sep 17 00:00:00 2001 From: Sebastien Viale Date: Wed, 24 Jul 2024 09:38:33 +0200 Subject: [PATCH 1/5] KAFKA-16448 Handle fatal user exception during processing error Co-authored-by: Dabz Co-authored-by: loicgreffier KAFKA-16448 Handle fatal user exception during processing error Co-authored-by: Dabz Co-authored-by: loicgreffier KAFKA-16448 Handle fatal user exception during processing error Co-authored-by: Dabz Co-authored-by: loicgreffier KAFKA-16448 Handle fatal user exception during processing error Co-authored-by: Dabz Co-authored-by: loicgreffier KAFKA-16448 Handle fatal user exception during processing error Co-authored-by: Dabz Co-authored-by: loicgreffier --- .../processor/internals/ProcessorNode.java | 8 ++++-- .../internals/ProcessorRecordContext.java | 9 ------ .../internals/ProcessorNodeTest.java | 28 +++++++++++++++++-- 3 files changed, 31 insertions(+), 14 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java index 763edc9a04500..7e99486e37d5d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java @@ -213,9 +213,13 @@ public void process(final Record record) { internalProcessorContext.currentNode().name(), internalProcessorContext.taskId()); - final ProcessingExceptionHandler.ProcessingHandlerResponse response = processingExceptionHandler - .handle(errorHandlerContext, record, e); + final ProcessingExceptionHandler.ProcessingHandlerResponse response; + try { + response = processingExceptionHandler.handle(errorHandlerContext, record, e); + } catch (final Exception fatalUserException) { + throw new StreamsException("Fatal user code error in processing error callback", fatalUserException); + } if (response == ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL) { log.error("Processing exception handler is set to fail upon" + " a processing error. If you would rather have the streaming pipeline" + diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java index 839baaad87528..3279e770d41e0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java @@ -31,13 +31,11 @@ import static org.apache.kafka.common.utils.Utils.getNullableSizePrefixedArray; public class ProcessorRecordContext implements RecordContext, RecordMetadata { - private final long timestamp; private final long offset; private final String topic; private final int partition; private final Headers headers; - public ProcessorRecordContext(final long timestamp, final long offset, final int partition, @@ -49,32 +47,26 @@ public ProcessorRecordContext(final long timestamp, this.partition = partition; this.headers = Objects.requireNonNull(headers); } - @Override public long offset() { return offset; } - @Override public long timestamp() { return timestamp; } - @Override public String topic() { return topic; } - @Override public int partition() { return partition; } - @Override public Headers headers() { return headers; } - public long residentMemorySizeEstimate() { long size = 0; size += Long.BYTES; // value.context.timestamp @@ -92,7 +84,6 @@ public long residentMemorySizeEstimate() { } return size; } - public byte[] serialize() { final byte[] topicBytes = topic.getBytes(UTF_8); final byte[][] headerKeysBytes; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java index df3f927686362..e18c4feae1ab1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java @@ -100,7 +100,7 @@ public void shouldThrowFailedProcessingExceptionWhenProcessingExceptionHandlerRe new ProcessorNode<>(NAME, new IgnoredInternalExceptionsProcessor(), Collections.emptySet()); final InternalProcessorContext internalProcessorContext = mockInternalProcessorContext(); - node.init(internalProcessorContext, new ProcessingExceptionHandlerMock(ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL, internalProcessorContext)); + node.init(internalProcessorContext, new ProcessingExceptionHandlerMock(ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL, internalProcessorContext, false)); final FailedProcessingException failedProcessingException = assertThrows(FailedProcessingException.class, () -> node.process(new Record<>(KEY, VALUE, TIMESTAMP))); @@ -116,7 +116,7 @@ public void shouldNotThrowFailedProcessingExceptionWhenProcessingExceptionHandle new ProcessorNode<>(NAME, new IgnoredInternalExceptionsProcessor(), Collections.emptySet()); final InternalProcessorContext internalProcessorContext = mockInternalProcessorContext(); - node.init(internalProcessorContext, new ProcessingExceptionHandlerMock(ProcessingExceptionHandler.ProcessingHandlerResponse.CONTINUE, internalProcessorContext)); + node.init(internalProcessorContext, new ProcessingExceptionHandlerMock(ProcessingExceptionHandler.ProcessingHandlerResponse.CONTINUE, internalProcessorContext, false)); assertDoesNotThrow(() -> node.process(new Record<>(KEY, VALUE, TIMESTAMP))); } @@ -146,6 +146,21 @@ public void shouldNotHandleInternalExceptionsThrownDuringProcessing(final String verify(processingExceptionHandler, never()).handle(any(), any(), any()); } + @Test + public void shouldThrowStreamsExceptionWhenProcessingExceptionHandlerThrowsAnException() { + final ProcessorNode node = + new ProcessorNode<>(NAME, new IgnoredInternalExceptionsProcessor(), Collections.emptySet()); + + final InternalProcessorContext internalProcessorContext = mockInternalProcessorContext(); + node.init(internalProcessorContext, new ProcessingExceptionHandlerMock(ProcessingExceptionHandler.ProcessingHandlerResponse.CONTINUE, internalProcessorContext, true)); + + final StreamsException streamsException = assertThrows(StreamsException.class, + () -> node.process(new Record<>(KEY, VALUE, TIMESTAMP))); + + final String msg = streamsException.getMessage(); + assertTrue(msg.contains("Fatal user code error in processing error callback")); + } + private static class ExceptionalProcessor implements Processor { @Override public void init(final ProcessorContext context) { @@ -323,10 +338,14 @@ public static class ProcessingExceptionHandlerMock implements ProcessingExceptio private final ProcessingExceptionHandler.ProcessingHandlerResponse response; private final InternalProcessorContext internalProcessorContext; + private final boolean shouldThrowException; + public ProcessingExceptionHandlerMock(final ProcessingExceptionHandler.ProcessingHandlerResponse response, - final InternalProcessorContext internalProcessorContext) { + final InternalProcessorContext internalProcessorContext, + final boolean shouldThrowException) { this.response = response; this.internalProcessorContext = internalProcessorContext; + this.shouldThrowException = shouldThrowException; } @Override @@ -341,6 +360,9 @@ public ProcessingExceptionHandler.ProcessingHandlerResponse handle(final ErrorHa assertTrue(exception instanceof RuntimeException); assertEquals("Processing exception should be caught and handled by the processing exception handler.", exception.getMessage()); + if (shouldThrowException) { + throw new RuntimeException("KABOOM!"); + } return response; } From b93561bf8913460c3d7cf5e21afa76843bcea584 Mon Sep 17 00:00:00 2001 From: Sebastien Viale Date: Tue, 30 Jul 2024 15:33:33 +0200 Subject: [PATCH 2/5] KAFKA-16448 Handle fatal user exception during processing error Co-authored-by: Dabz Co-authored-by: loicgreffier --- .../processor/internals/ProcessorNode.java | 2 +- ...essingExceptionHandlerIntegrationTest.java | 146 ++++++++++++++++++ 2 files changed, 147 insertions(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java index 7e99486e37d5d..175c9e104efbc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java @@ -218,7 +218,7 @@ public void process(final Record record) { try { response = processingExceptionHandler.handle(errorHandlerContext, record, e); } catch (final Exception fatalUserException) { - throw new StreamsException("Fatal user code error in processing error callback", fatalUserException); + throw new FailedProcessingException(fatalUserException); } if (response == ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL) { log.error("Processing exception handler is set to fail upon" + diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java index 6c1a64344ec76..61b5ed16bb1ef 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java @@ -45,10 +45,12 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertIterableEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -149,9 +151,150 @@ public void shouldContinueWhenProcessingExceptionOccurs() { } } + @Test + public void shouldStopOnFailedProcessorWhenProcessingExceptionOccursInFailProcessingExceptionHandler() { + final KeyValue event = new KeyValue<>("ID123-1", "ID123-A1"); + final KeyValue eventError = new KeyValue<>("ID123-2-ERR", "ID123-A2"); + + final MockProcessorSupplier processor = new MockProcessorSupplier<>(); + final StreamsBuilder builder = new StreamsBuilder(); + final AtomicBoolean isExecuted = new AtomicBoolean(false); + builder + .stream("TOPIC_NAME", Consumed.with(Serdes.String(), Serdes.String())) + .map(KeyValue::new) + .mapValues(value -> value) + .process(runtimeErrorProcessorSupplierMock()) + .map((k, v) -> { + isExecuted.set(true); + return KeyValue.pair(k, v); + }) + .process(processor); + + final Properties properties = new Properties(); + properties.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, FailProcessingExceptionHandlerMockTest.class); + + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), properties, Instant.ofEpochMilli(0L))) { + final TestInputTopic inputTopic = driver.createInputTopic("TOPIC_NAME", new StringSerializer(), new StringSerializer()); + isExecuted.set(false); + inputTopic.pipeInput(event.key, event.value, Instant.EPOCH); + assertTrue(isExecuted.get()); + isExecuted.set(false); + final StreamsException e = assertThrows(StreamsException.class, () -> inputTopic.pipeInput(eventError.key, eventError.value, Instant.EPOCH)); + assertTrue(e.getMessage().contains("Exception caught in process. " + + "taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=TOPIC_NAME, " + + "partition=0, offset=1, stacktrace=java.lang.RuntimeException: " + + "Exception should be handled by processing exception handler")); + assertFalse(isExecuted.get()); + } + } + + @Test + public void shouldStopOnFailedProcessorWhenProcessingExceptionOccursInContinueProcessingExceptionHandler() { + final KeyValue event = new KeyValue<>("ID123-1", "ID123-A1"); + final KeyValue eventFalse = new KeyValue<>("ID123-2-ERR", "ID123-A2"); + + final MockProcessorSupplier processor = new MockProcessorSupplier<>(); + final StreamsBuilder builder = new StreamsBuilder(); + final AtomicBoolean isExecuted = new AtomicBoolean(false); + builder + .stream("TOPIC_NAME", Consumed.with(Serdes.String(), Serdes.String())) + .map(KeyValue::new) + .mapValues(value -> value) + .process(runtimeErrorProcessorSupplierMock()) + .map((k, v) -> { + isExecuted.set(true); + return KeyValue.pair(k, v); + }) + .process(processor); + + final Properties properties = new Properties(); + properties.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, ContinueProcessingExceptionHandlerMockTest.class); + + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), properties, Instant.ofEpochMilli(0L))) { + final TestInputTopic inputTopic = driver.createInputTopic("TOPIC_NAME", new StringSerializer(), new StringSerializer()); + isExecuted.set(false); + inputTopic.pipeInput(event.key, event.value, Instant.EPOCH); + assertTrue(isExecuted.get()); + isExecuted.set(false); + inputTopic.pipeInput(eventFalse.key, eventFalse.value, Instant.EPOCH); + assertFalse(isExecuted.get()); + } + } + + @Test + public void shouldStopProcessingWhenFatalUserExceptionInFailProcessingExceptionHandler() { + final KeyValue event = new KeyValue<>("ID123-1", "ID123-A1"); + final KeyValue eventError = new KeyValue<>("ID123-ERR-FATAL", "ID123-A2"); + + final MockProcessorSupplier processor = new MockProcessorSupplier<>(); + final StreamsBuilder builder = new StreamsBuilder(); + final AtomicBoolean isExecuted = new AtomicBoolean(false); + builder + .stream("TOPIC_NAME", Consumed.with(Serdes.String(), Serdes.String())) + .map(KeyValue::new) + .mapValues(value -> value) + .process(runtimeErrorProcessorSupplierMock()) + .map((k, v) -> { + isExecuted.set(true); + return KeyValue.pair(k, v); + }) + .process(processor); + + final Properties properties = new Properties(); + properties.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, FailProcessingExceptionHandlerMockTest.class); + + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), properties, Instant.ofEpochMilli(0L))) { + final TestInputTopic inputTopic = driver.createInputTopic("TOPIC_NAME", new StringSerializer(), new StringSerializer()); + isExecuted.set(false); + inputTopic.pipeInput(event.key, event.value, Instant.EPOCH); + assertTrue(isExecuted.get()); + isExecuted.set(false); + final StreamsException e = assertThrows(StreamsException.class, () -> inputTopic.pipeInput(eventError.key, eventError.value, Instant.EPOCH)); + assertEquals("KABOOM!", e.getCause().getMessage()); + assertFalse(isExecuted.get()); + } + } + + @Test + public void shouldStopProcessingWhenFatalUserExceptionInContinueProcessingExceptionHandler() { + final KeyValue event = new KeyValue<>("ID123-1", "ID123-A1"); + final KeyValue eventError = new KeyValue<>("ID123-ERR-FATAL", "ID123-A2"); + + final MockProcessorSupplier processor = new MockProcessorSupplier<>(); + final StreamsBuilder builder = new StreamsBuilder(); + final AtomicBoolean isExecuted = new AtomicBoolean(false); + builder + .stream("TOPIC_NAME", Consumed.with(Serdes.String(), Serdes.String())) + .map(KeyValue::new) + .mapValues(value -> value) + .process(runtimeErrorProcessorSupplierMock()) + .map((k, v) -> { + isExecuted.set(true); + return KeyValue.pair(k, v); + }) + .process(processor); + + final Properties properties = new Properties(); + properties.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, ContinueProcessingExceptionHandlerMockTest.class); + + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), properties, Instant.ofEpochMilli(0L))) { + final TestInputTopic inputTopic = driver.createInputTopic("TOPIC_NAME", new StringSerializer(), new StringSerializer()); + isExecuted.set(false); + inputTopic.pipeInput(event.key, event.value, Instant.EPOCH); + assertTrue(isExecuted.get()); + isExecuted.set(false); + final StreamsException e = assertThrows(StreamsException.class, () -> inputTopic.pipeInput(eventError.key, eventError.value, Instant.EPOCH)); + assertEquals("KABOOM!", e.getCause().getMessage()); + assertFalse(isExecuted.get()); + } + } + public static class ContinueProcessingExceptionHandlerMockTest implements ProcessingExceptionHandler { @Override public ProcessingExceptionHandler.ProcessingHandlerResponse handle(final ErrorHandlerContext context, final Record record, final Exception exception) { + if (((String) record.key()).contains("FATAL")) { + throw new RuntimeException("KABOOM!"); + } assertProcessingExceptionHandlerInputs(context, record, exception); return ProcessingExceptionHandler.ProcessingHandlerResponse.CONTINUE; } @@ -165,6 +308,9 @@ public void configure(final Map configs) { public static class FailProcessingExceptionHandlerMockTest implements ProcessingExceptionHandler { @Override public ProcessingExceptionHandler.ProcessingHandlerResponse handle(final ErrorHandlerContext context, final Record record, final Exception exception) { + if (((String) record.key()).contains("FATAL")) { + throw new RuntimeException("KABOOM!"); + } assertProcessingExceptionHandlerInputs(context, record, exception); return ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL; } From 92235e5fce53cf2e1f5de0a19566c85a725448ea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Greffier?= Date: Tue, 30 Jul 2024 22:44:23 +0200 Subject: [PATCH 3/5] KAFKA-16448: Fix processing exception handler throwing exception itself Co-authored-by: Dabz Co-authored-by: loicgreffier --- .../processor/internals/ProcessorNodeTest.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java index e18c4feae1ab1..9fe9244e0aa1d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java @@ -60,6 +60,7 @@ import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; @@ -147,18 +148,18 @@ public void shouldNotHandleInternalExceptionsThrownDuringProcessing(final String } @Test - public void shouldThrowStreamsExceptionWhenProcessingExceptionHandlerThrowsAnException() { + public void shouldThrowFailedProcessingExceptionWhenProcessingExceptionHandlerThrowsAnException() { final ProcessorNode node = new ProcessorNode<>(NAME, new IgnoredInternalExceptionsProcessor(), Collections.emptySet()); final InternalProcessorContext internalProcessorContext = mockInternalProcessorContext(); node.init(internalProcessorContext, new ProcessingExceptionHandlerMock(ProcessingExceptionHandler.ProcessingHandlerResponse.CONTINUE, internalProcessorContext, true)); - final StreamsException streamsException = assertThrows(StreamsException.class, + final FailedProcessingException failedProcessingException = assertThrows(FailedProcessingException.class, () -> node.process(new Record<>(KEY, VALUE, TIMESTAMP))); - final String msg = streamsException.getMessage(); - assertTrue(msg.contains("Fatal user code error in processing error callback")); + assertInstanceOf(RuntimeException.class, failedProcessingException.getCause()); + assertEquals("KABOOM!", failedProcessingException.getCause().getMessage()); } private static class ExceptionalProcessor implements Processor { @@ -357,7 +358,7 @@ public ProcessingExceptionHandler.ProcessingHandlerResponse handle(final ErrorHa assertEquals(internalProcessorContext.taskId(), context.taskId()); assertEquals(KEY, record.key()); assertEquals(VALUE, record.value()); - assertTrue(exception instanceof RuntimeException); + assertInstanceOf(RuntimeException.class, exception); assertEquals("Processing exception should be caught and handled by the processing exception handler.", exception.getMessage()); if (shouldThrowException) { From 458ea81206970e13053b653ce2144c94ec86110e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Greffier?= Date: Wed, 31 Jul 2024 01:18:04 +0200 Subject: [PATCH 4/5] KAFKA-16448: Fix spaces --- .../processor/internals/ProcessorRecordContext.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java index 3279e770d41e0..6c55c75fa34a7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java @@ -31,11 +31,13 @@ import static org.apache.kafka.common.utils.Utils.getNullableSizePrefixedArray; public class ProcessorRecordContext implements RecordContext, RecordMetadata { + private final long timestamp; private final long offset; private final String topic; private final int partition; private final Headers headers; + public ProcessorRecordContext(final long timestamp, final long offset, final int partition, @@ -47,26 +49,32 @@ public ProcessorRecordContext(final long timestamp, this.partition = partition; this.headers = Objects.requireNonNull(headers); } + @Override public long offset() { return offset; } + @Override public long timestamp() { return timestamp; } + @Override public String topic() { return topic; } + @Override public int partition() { return partition; } + @Override public Headers headers() { return headers; } + public long residentMemorySizeEstimate() { long size = 0; size += Long.BYTES; // value.context.timestamp @@ -84,6 +92,7 @@ public long residentMemorySizeEstimate() { } return size; } + public byte[] serialize() { final byte[] topicBytes = topic.getBytes(UTF_8); final byte[][] headerKeysBytes; From 95870692de03fdccca4017e2cfa561b71063f41b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Greffier?= Date: Wed, 31 Jul 2024 01:18:42 +0200 Subject: [PATCH 5/5] KAFKA-16448: Fix spaces --- .../streams/processor/internals/ProcessorRecordContext.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java index 6c55c75fa34a7..839baaad87528 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java @@ -92,7 +92,7 @@ public long residentMemorySizeEstimate() { } return size; } - + public byte[] serialize() { final byte[] topicBytes = topic.getBytes(UTF_8); final byte[][] headerKeysBytes;