From 31e9c1479c41cbad55a0eb5b82b89bdc7d969b57 Mon Sep 17 00:00:00 2001 From: Dhruvil Shah Date: Mon, 2 Apr 2018 16:45:24 -0700 Subject: [PATCH 1/5] KAFKA-6739: When down-converting from V2 to V0/V1, broker must ignore any header present in the record. Added a test case to verify sanity when down-converting records containing headers. --- .../kafka/common/record/AbstractRecords.java | 8 ++++++-- .../kafka/common/record/FileRecordsTest.java | 20 +++++++++++++------ 2 files changed, 20 insertions(+), 8 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java index 2452798d48551..f58d77c87a47d 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java @@ -130,8 +130,12 @@ private MemoryRecordsBuilder convertRecordBatch(byte magic, ByteBuffer buffer, R MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, batch.compressionType(), timestampType, recordBatchAndRecords.baseOffset, logAppendTime); - for (Record record : recordBatchAndRecords.records) - builder.append(record); + for (Record record : recordBatchAndRecords.records) { + if (magic > RecordBatch.MAGIC_VALUE_V1) + builder.append(record); + else + builder.appendWithOffset(record.offset(), record.timestamp(), record.key(), record.value()); + } builder.close(); return builder; diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java index 53ac200358626..2fb824d122dee 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java @@ -17,6 +17,8 @@ package org.apache.kafka.common.record; import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; @@ -36,10 +38,7 @@ import static java.util.Arrays.asList; import static org.apache.kafka.test.TestUtils.tempFile; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.junit.Assert.*; public class FileRecordsTest { @@ -358,6 +357,13 @@ public void testConversion() throws IOException { private void doTestConversion(CompressionType compressionType, byte toMagic) throws IOException { List offsets = asList(0L, 2L, 3L, 9L, 11L, 15L, 16L, 17L, 22L, 24L); + + Header headers[] = { + new RecordHeader("headerKey1", "headerValue1".getBytes()), + new RecordHeader("headerKey2", "headerValue2".getBytes()), + new RecordHeader("headerKey3", "headerValue3".getBytes()), + }; + List records = asList( new SimpleRecord(1L, "k1".getBytes(), "hello".getBytes()), new SimpleRecord(2L, "k2".getBytes(), "goodbye".getBytes()), @@ -366,9 +372,10 @@ private void doTestConversion(CompressionType compressionType, byte toMagic) thr new SimpleRecord(5L, "k5".getBytes(), "hello again".getBytes()), new SimpleRecord(6L, "k6".getBytes(), "I sense indecision".getBytes()), new SimpleRecord(7L, "k7".getBytes(), "what now".getBytes()), - new SimpleRecord(8L, "k8".getBytes(), "running out".getBytes()), + new SimpleRecord(8L, "k8".getBytes(), "running out".getBytes(), headers), new SimpleRecord(9L, "k9".getBytes(), "ok, almost done".getBytes()), - new SimpleRecord(10L, "k10".getBytes(), "finally".getBytes())); + new SimpleRecord(10L, "k10".getBytes(), "finally".getBytes(), headers)); + assertEquals("incorrect test setup", offsets.size(), records.size()); ByteBuffer buffer = ByteBuffer.allocate(1024); MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V0, compressionType, @@ -452,6 +459,7 @@ private void verifyConvertedRecords(List initialRecords, assertEquals("Timestamp should not change", initialRecords.get(i).timestamp(), record.timestamp()); assertFalse(record.hasTimestampType(TimestampType.CREATE_TIME)); assertFalse(record.hasTimestampType(TimestampType.NO_TIMESTAMP_TYPE)); + assertArrayEquals("Headers should not change", initialRecords.get(i).headers(), record.headers()); } i += 1; } From 9c95eed9c72fbfd1a26e2e8e3b8e903a2feb39c1 Mon Sep 17 00:00:00 2001 From: Dhruvil Shah Date: Mon, 2 Apr 2018 16:59:20 -0700 Subject: [PATCH 2/5] Add comment. --- .../java/org/apache/kafka/common/record/AbstractRecords.java | 1 + 1 file changed, 1 insertion(+) diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java index f58d77c87a47d..69e167bf2a0d6 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java @@ -131,6 +131,7 @@ private MemoryRecordsBuilder convertRecordBatch(byte magic, ByteBuffer buffer, R MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, batch.compressionType(), timestampType, recordBatchAndRecords.baseOffset, logAppendTime); for (Record record : recordBatchAndRecords.records) { + // Down-convert this record. Ignore the presence of headers when down-converting to V0 and V1 (see KAFKA-6739). if (magic > RecordBatch.MAGIC_VALUE_V1) builder.append(record); else From d44531d1ad712902a823499617cca8fd62b6b4fc Mon Sep 17 00:00:00 2001 From: Dhruvil Shah Date: Mon, 2 Apr 2018 17:17:06 -0700 Subject: [PATCH 3/5] Make checkStyle happy :-) --- .../kafka/common/record/FileRecordsTest.java | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java index 2fb824d122dee..fdd3ede16cc02 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java @@ -38,7 +38,11 @@ import static java.util.Arrays.asList; import static org.apache.kafka.test.TestUtils.tempFile; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.junit.Assert.assertArrayEquals; public class FileRecordsTest { @@ -358,11 +362,9 @@ public void testConversion() throws IOException { private void doTestConversion(CompressionType compressionType, byte toMagic) throws IOException { List offsets = asList(0L, 2L, 3L, 9L, 11L, 15L, 16L, 17L, 22L, 24L); - Header headers[] = { - new RecordHeader("headerKey1", "headerValue1".getBytes()), - new RecordHeader("headerKey2", "headerValue2".getBytes()), - new RecordHeader("headerKey3", "headerValue3".getBytes()), - }; + Header[] headers = {new RecordHeader("headerKey1", "headerValue1".getBytes()), + new RecordHeader("headerKey2", "headerValue2".getBytes()), + new RecordHeader("headerKey3", "headerValue3".getBytes())}; List records = asList( new SimpleRecord(1L, "k1".getBytes(), "hello".getBytes()), From 750f66181c84dfcb4592dc0e84573f5ca78e4420 Mon Sep 17 00:00:00 2001 From: Dhruvil Shah Date: Tue, 3 Apr 2018 10:05:37 -0700 Subject: [PATCH 4/5] Address review comment. --- .../java/org/apache/kafka/common/record/AbstractRecords.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java index 69e167bf2a0d6..e43d0f8e782fd 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java @@ -131,7 +131,8 @@ private MemoryRecordsBuilder convertRecordBatch(byte magic, ByteBuffer buffer, R MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, batch.compressionType(), timestampType, recordBatchAndRecords.baseOffset, logAppendTime); for (Record record : recordBatchAndRecords.records) { - // Down-convert this record. Ignore the presence of headers when down-converting to V0 and V1 (see KAFKA-6739). + // Down-convert this record. Because V0 and V1 do not support headers, ignore its presence when down-converting + // to V0 or V1. if (magic > RecordBatch.MAGIC_VALUE_V1) builder.append(record); else From be6461ecc1f395d0c259c3378fb13b5c3d30c9ea Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Tue, 3 Apr 2018 10:36:39 -0700 Subject: [PATCH 5/5] Minor comment tweak --- .../java/org/apache/kafka/common/record/AbstractRecords.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java index e43d0f8e782fd..89a5413e00cf4 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java @@ -131,8 +131,7 @@ private MemoryRecordsBuilder convertRecordBatch(byte magic, ByteBuffer buffer, R MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, batch.compressionType(), timestampType, recordBatchAndRecords.baseOffset, logAppendTime); for (Record record : recordBatchAndRecords.records) { - // Down-convert this record. Because V0 and V1 do not support headers, ignore its presence when down-converting - // to V0 or V1. + // Down-convert this record. Ignore headers when down-converting to V0 and V1 since they are not supported if (magic > RecordBatch.MAGIC_VALUE_V1) builder.append(record); else