Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,13 @@ private MemoryRecordsBuilder convertRecordBatch(byte magic, ByteBuffer buffer, R

MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, batch.compressionType(),
timestampType, recordBatchAndRecords.baseOffset, logAppendTime);
for (Record record : recordBatchAndRecords.records)
builder.append(record);
for (Record record : recordBatchAndRecords.records) {
// Down-convert this record. Ignore headers when down-converting to V0 and V1 since they are not supported
if (magic > RecordBatch.MAGIC_VALUE_V1)
builder.append(record);
else
builder.appendWithOffset(record.offset(), record.timestamp(), record.key(), record.value());
}

builder.close();
return builder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package org.apache.kafka.common.record;

import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
Expand All @@ -40,6 +42,7 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assert.assertArrayEquals;

public class FileRecordsTest {

Expand Down Expand Up @@ -358,6 +361,11 @@ public void testConversion() throws IOException {

private void doTestConversion(CompressionType compressionType, byte toMagic) throws IOException {
List<Long> offsets = asList(0L, 2L, 3L, 9L, 11L, 15L, 16L, 17L, 22L, 24L);

Header[] headers = {new RecordHeader("headerKey1", "headerValue1".getBytes()),
new RecordHeader("headerKey2", "headerValue2".getBytes()),
new RecordHeader("headerKey3", "headerValue3".getBytes())};

List<SimpleRecord> records = asList(
new SimpleRecord(1L, "k1".getBytes(), "hello".getBytes()),
new SimpleRecord(2L, "k2".getBytes(), "goodbye".getBytes()),
Expand All @@ -366,9 +374,10 @@ private void doTestConversion(CompressionType compressionType, byte toMagic) thr
new SimpleRecord(5L, "k5".getBytes(), "hello again".getBytes()),
new SimpleRecord(6L, "k6".getBytes(), "I sense indecision".getBytes()),
new SimpleRecord(7L, "k7".getBytes(), "what now".getBytes()),
new SimpleRecord(8L, "k8".getBytes(), "running out".getBytes()),
new SimpleRecord(8L, "k8".getBytes(), "running out".getBytes(), headers),
new SimpleRecord(9L, "k9".getBytes(), "ok, almost done".getBytes()),
new SimpleRecord(10L, "k10".getBytes(), "finally".getBytes()));
new SimpleRecord(10L, "k10".getBytes(), "finally".getBytes(), headers));
assertEquals("incorrect test setup", offsets.size(), records.size());

ByteBuffer buffer = ByteBuffer.allocate(1024);
MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V0, compressionType,
Expand Down Expand Up @@ -452,6 +461,7 @@ private void verifyConvertedRecords(List<SimpleRecord> initialRecords,
assertEquals("Timestamp should not change", initialRecords.get(i).timestamp(), record.timestamp());
assertFalse(record.hasTimestampType(TimestampType.CREATE_TIME));
assertFalse(record.hasTimestampType(TimestampType.NO_TIMESTAMP_TYPE));
assertArrayEquals("Headers should not change", initialRecords.get(i).headers(), record.headers());
}
i += 1;
}
Expand Down