diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java index 2452798d48551..89a5413e00cf4 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java @@ -130,8 +130,13 @@ private MemoryRecordsBuilder convertRecordBatch(byte magic, ByteBuffer buffer, R MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, batch.compressionType(), timestampType, recordBatchAndRecords.baseOffset, logAppendTime); - for (Record record : recordBatchAndRecords.records) - builder.append(record); + for (Record record : recordBatchAndRecords.records) { + // Down-convert this record. Ignore headers when down-converting to V0 and V1 since they are not supported + if (magic > RecordBatch.MAGIC_VALUE_V1) + builder.append(record); + else + builder.appendWithOffset(record.offset(), record.timestamp(), record.key(), record.value()); + } builder.close(); return builder; diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java index 53ac200358626..fdd3ede16cc02 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java @@ -17,6 +17,8 @@ package org.apache.kafka.common.record; import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; @@ -40,6 +42,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.junit.Assert.assertArrayEquals; public class FileRecordsTest { @@ -358,6 +361,11 @@ public void testConversion() throws IOException { private void doTestConversion(CompressionType compressionType, byte toMagic) throws IOException { List offsets = asList(0L, 2L, 3L, 9L, 11L, 15L, 16L, 17L, 22L, 24L); + + Header[] headers = {new RecordHeader("headerKey1", "headerValue1".getBytes()), + new RecordHeader("headerKey2", "headerValue2".getBytes()), + new RecordHeader("headerKey3", "headerValue3".getBytes())}; + List records = asList( new SimpleRecord(1L, "k1".getBytes(), "hello".getBytes()), new SimpleRecord(2L, "k2".getBytes(), "goodbye".getBytes()), @@ -366,9 +374,10 @@ private void doTestConversion(CompressionType compressionType, byte toMagic) thr new SimpleRecord(5L, "k5".getBytes(), "hello again".getBytes()), new SimpleRecord(6L, "k6".getBytes(), "I sense indecision".getBytes()), new SimpleRecord(7L, "k7".getBytes(), "what now".getBytes()), - new SimpleRecord(8L, "k8".getBytes(), "running out".getBytes()), + new SimpleRecord(8L, "k8".getBytes(), "running out".getBytes(), headers), new SimpleRecord(9L, "k9".getBytes(), "ok, almost done".getBytes()), - new SimpleRecord(10L, "k10".getBytes(), "finally".getBytes())); + new SimpleRecord(10L, "k10".getBytes(), "finally".getBytes(), headers)); + assertEquals("incorrect test setup", offsets.size(), records.size()); ByteBuffer buffer = ByteBuffer.allocate(1024); MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V0, compressionType, @@ -452,6 +461,7 @@ private void verifyConvertedRecords(List initialRecords, assertEquals("Timestamp should not change", initialRecords.get(i).timestamp(), record.timestamp()); assertFalse(record.hasTimestampType(TimestampType.CREATE_TIME)); assertFalse(record.hasTimestampType(TimestampType.NO_TIMESTAMP_TYPE)); + assertArrayEquals("Headers should not change", initialRecords.get(i).headers(), record.headers()); } i += 1; }