From 9358b90b6a4ca08771dbd010a9e43412e6b67dad Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Wed, 28 Oct 2020 16:56:38 +0800 Subject: [PATCH] KAFKA-10645: add null check to the array/Iterable values in RecordHeaders constructor --- .../header/internals/RecordHeaders.java | 19 +++++++++++++++++++ .../header/internals/RecordHeadersTest.java | 10 ++++++++++ 2 files changed, 29 insertions(+) diff --git a/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeaders.java b/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeaders.java index 1277408270800..aa091036aeb79 100644 --- a/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeaders.java +++ b/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeaders.java @@ -38,6 +38,7 @@ public RecordHeaders() { } public RecordHeaders(Header[] headers) { + checkNullHeader(headers); if (headers == null) { this.headers = new ArrayList<>(); } else { @@ -46,6 +47,7 @@ public RecordHeaders(Header[] headers) { } public RecordHeaders(Iterable
headers) { + checkNullHeader(headers); //Use efficient copy constructor if possible, fallback to iteration otherwise if (headers == null) { this.headers = new ArrayList<>(); @@ -117,6 +119,23 @@ public Header[] toArray() { return headers.isEmpty() ? Record.EMPTY_HEADERS : headers.toArray(new Header[headers.size()]); } + private void checkNullHeader(Header[] headers) throws IllegalArgumentException { + if (headers != null) { + if (!Arrays.stream(headers).allMatch(Objects::nonNull)) { + throw new IllegalArgumentException("header value cannot be null."); + } + } + } + + private void checkNullHeader(Iterable
headers) throws IllegalArgumentException { + if (headers != null) { + headers.forEach(header -> { + if (header == null) + throw new IllegalArgumentException("header value cannot be null."); + }); + } + } + private void checkKey(String key) { if (key == null) throw new IllegalArgumentException("key cannot be null."); diff --git a/clients/src/test/java/org/apache/kafka/common/header/internals/RecordHeadersTest.java b/clients/src/test/java/org/apache/kafka/common/header/internals/RecordHeadersTest.java index 5b9f95ea91f18..bee117bdb1925 100644 --- a/clients/src/test/java/org/apache/kafka/common/header/internals/RecordHeadersTest.java +++ b/clients/src/test/java/org/apache/kafka/common/header/internals/RecordHeadersTest.java @@ -211,6 +211,16 @@ public void shouldThrowNpeWhenAddingNullHeader() { new RecordHeaders().add(null); } + @Test(expected = java.lang.IllegalArgumentException.class) + public void shouldThrowExceptionWhenPassingArrayWithNullValueInConstructor() { + new RecordHeaders(new Header[] {new RecordHeader("key", "value".getBytes()), null}); + } + + @Test(expected = java.lang.IllegalArgumentException.class) + public void shouldThrowExceptionWhenPassingIterableWithNullValueInConstructor() { + new RecordHeaders(Arrays.asList(new Header[] {new RecordHeader("key", "value".getBytes()), null})); + } + private int getCount(Headers headers) { int count = 0; Iterator
headerIterator = headers.iterator();