Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
e9c0e38
KAFKA-13093: Log compaction should write new segments with record ver…
ijuma Dec 26, 2024
8256a06
Remove warning related to consumers earlier than 0.10.1 - they are no…
ijuma Dec 27, 2024
90c86db
UnifiedLog.maybeCreateLeaderEpochCache -> createLeaderEpochCache sinc…
ijuma Dec 27, 2024
bb28607
Add back `testLeaderEpochCacheCreatedAfterMessageFormatUpgrade` as it…
ijuma Dec 27, 2024
63b10f7
Remove unused parameters for LogCleaner.filterInfo
ijuma Dec 29, 2024
021d86d
Remove unnecessary version check in LogCleaner.filterInfo
ijuma Dec 29, 2024
b92c3d6
Adjust the conversion of V0 to V2 during compaction
ijuma Dec 29, 2024
58e1547
Add tests
ijuma Dec 29, 2024
593dc83
Remove unused imports
ijuma Dec 29, 2024
a50bd5d
Remove unused method parameter
ijuma Dec 29, 2024
8ad3400
Merge branch 'trunk' into kafka-13093-log-compaction-write-record-v2
ijuma Jan 4, 2025
6689ade
Merge remote-tracking branch 'apache-github/trunk' into kafka-13093-l…
ijuma Jan 8, 2025
ed6e240
Change UnifiedLog.leaderEpochCache from Option[LeaderEpochFileCache] …
ijuma Jan 8, 2025
b649179
Fix issue where method was not overriden due to change in signature
ijuma Jan 8, 2025
7628099
Fix NPE in `UnifiedLog.delete`
ijuma Jan 8, 2025
56a93df
Adjust assert to address review comment
ijuma Jan 8, 2025
fb0f06f
leaderEpochCache is never null
ijuma Jan 8, 2025
ef3ec42
Add back test with adjustments (including name)
ijuma Jan 8, 2025
525d8df
Fix test
ijuma Jan 8, 2025
97448ce
Rename `initializeLeaderEpochCache` to `reinitializeLeaderEpochCache`
ijuma Jan 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.kafka.common.record;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.errors.CorruptRecordException;
import org.apache.kafka.common.message.KRaftVersionRecord;
Expand Down Expand Up @@ -137,31 +136,25 @@ public Integer firstBatchSize() {
/**
* Filter the records into the provided ByteBuffer.
*
* @param partition The partition that is filtered (used only for logging)
* @param filter The filter function
* @param destinationBuffer The byte buffer to write the filtered records to
* @param maxRecordBatchSize The maximum record batch size. Note this is not a hard limit: if a batch
* exceeds this after filtering, we log a warning, but the batch will still be
* created.
* @param decompressionBufferSupplier The supplier of ByteBuffer(s) used for decompression if supported. For small
* record batches, allocating a potentially large buffer (64 KB for LZ4) will
* dominate the cost of decompressing and iterating over the records in the
* batch. As such, a supplier that reuses buffers will have a significant
* performance impact.
* @return A FilterResult with a summary of the output (for metrics) and potentially an overflow buffer
*/
public FilterResult filterTo(TopicPartition partition, RecordFilter filter, ByteBuffer destinationBuffer,
int maxRecordBatchSize, BufferSupplier decompressionBufferSupplier) {
return filterTo(partition, batches(), filter, destinationBuffer, maxRecordBatchSize, decompressionBufferSupplier);
public FilterResult filterTo(RecordFilter filter, ByteBuffer destinationBuffer, BufferSupplier decompressionBufferSupplier) {
return filterTo(batches(), filter, destinationBuffer, decompressionBufferSupplier);
}

/**
* Note: This method is also used to convert the first timestamp of the batch (which is usually the timestamp of the first record)
* to the delete horizon of the tombstones or txn markers which are present in the batch.
*/
private static FilterResult filterTo(TopicPartition partition, Iterable<MutableRecordBatch> batches,
RecordFilter filter, ByteBuffer destinationBuffer, int maxRecordBatchSize,
BufferSupplier decompressionBufferSupplier) {
private static FilterResult filterTo(Iterable<MutableRecordBatch> batches, RecordFilter filter,
ByteBuffer destinationBuffer, BufferSupplier decompressionBufferSupplier) {
FilterResult filterResult = new FilterResult(destinationBuffer);
ByteBufferOutputStream bufferOutputStream = new ByteBufferOutputStream(destinationBuffer);
for (MutableRecordBatch batch : batches) {
Expand All @@ -174,15 +167,9 @@ private static FilterResult filterTo(TopicPartition partition, Iterable<MutableR
if (batchRetention == BatchRetention.DELETE)
continue;

// We use the absolute offset to decide whether to retain the message or not. Due to KAFKA-4298, we have to
// allow for the possibility that a previous version corrupted the log by writing a compressed record batch
// with a magic value not matching the magic of the records (magic < 2). This will be fixed as we
// recopy the messages to the destination buffer.
byte batchMagic = batch.magic();
List<Record> retainedRecords = new ArrayList<>();

final BatchFilterResult iterationResult = filterBatch(batch, decompressionBufferSupplier, filterResult, filter,
batchMagic, true, retainedRecords);
final BatchFilterResult iterationResult = filterBatch(batch, decompressionBufferSupplier, filterResult,
filter);
List<Record> retainedRecords = iterationResult.retainedRecords;
boolean containsTombstones = iterationResult.containsTombstones;
boolean writeOriginalBatch = iterationResult.writeOriginalBatch;
long maxOffset = iterationResult.maxOffset;
Expand All @@ -191,8 +178,8 @@ private static FilterResult filterTo(TopicPartition partition, Iterable<MutableR
// we check if the delete horizon should be set to a new value
// in which case, we need to reset the base timestamp and overwrite the timestamp deltas
// if the batch does not contain tombstones, then we don't need to overwrite batch
boolean needToSetDeleteHorizon = batch.magic() >= RecordBatch.MAGIC_VALUE_V2 && (containsTombstones || containsMarkerForEmptyTxn)
&& batch.deleteHorizonMs().isEmpty();
boolean needToSetDeleteHorizon = (containsTombstones || containsMarkerForEmptyTxn) &&
batch.deleteHorizonMs().isEmpty();
if (writeOriginalBatch && !needToSetDeleteHorizon) {
batch.writeTo(bufferOutputStream);
filterResult.updateRetainedBatchMetadata(batch, retainedRecords.size(), false);
Expand All @@ -202,26 +189,21 @@ private static FilterResult filterTo(TopicPartition partition, Iterable<MutableR
deleteHorizonMs = filter.currentTime + filter.deleteRetentionMs;
else
deleteHorizonMs = batch.deleteHorizonMs().orElse(RecordBatch.NO_TIMESTAMP);
try (final MemoryRecordsBuilder builder = buildRetainedRecordsInto(batch, retainedRecords, bufferOutputStream, deleteHorizonMs)) {
try (final MemoryRecordsBuilder builder = buildRetainedRecordsInto(batch, retainedRecords,
bufferOutputStream, deleteHorizonMs)) {
MemoryRecords records = builder.build();
int filteredBatchSize = records.sizeInBytes();
if (filteredBatchSize > batch.sizeInBytes() && filteredBatchSize > maxRecordBatchSize)
log.warn("Record batch from {} with last offset {} exceeded max record batch size {} after cleaning " +
"(new size is {}). Consumers with version earlier than 0.10.1.0 may need to " +
"increase their fetch sizes.",
partition, batch.lastOffset(), maxRecordBatchSize, filteredBatchSize);

MemoryRecordsBuilder.RecordsInfo info = builder.info();
filterResult.updateRetainedBatchMetadata(info.maxTimestamp, info.shallowOffsetOfMaxTimestamp,
maxOffset, retainedRecords.size(), filteredBatchSize);
}
}
} else if (batchRetention == BatchRetention.RETAIN_EMPTY) {
if (batchMagic < RecordBatch.MAGIC_VALUE_V2)
if (batch.magic() < RecordBatch.MAGIC_VALUE_V2) // should never happen
throw new IllegalStateException("Empty batches are only supported for magic v2 and above");

bufferOutputStream.ensureRemaining(DefaultRecordBatch.RECORD_BATCH_OVERHEAD);
DefaultRecordBatch.writeEmptyHeader(bufferOutputStream.buffer(), batchMagic, batch.producerId(),
DefaultRecordBatch.writeEmptyHeader(bufferOutputStream.buffer(), RecordBatch.CURRENT_MAGIC_VALUE, batch.producerId(),
batch.producerEpoch(), batch.baseSequence(), batch.baseOffset(), batch.lastOffset(),
batch.partitionLeaderEpoch(), batch.timestampType(), batch.maxTimestamp(),
batch.isTransactional(), batch.isControlBatch());
Expand All @@ -243,23 +225,18 @@ private static FilterResult filterTo(TopicPartition partition, Iterable<MutableR
private static BatchFilterResult filterBatch(RecordBatch batch,
BufferSupplier decompressionBufferSupplier,
FilterResult filterResult,
RecordFilter filter,
byte batchMagic,
boolean writeOriginalBatch,
List<Record> retainedRecords) {
long maxOffset = -1;
boolean containsTombstones = false;
RecordFilter filter) {
try (final CloseableIterator<Record> iterator = batch.streamingIterator(decompressionBufferSupplier)) {
long maxOffset = -1;
boolean containsTombstones = false;
// Convert records with old record versions
boolean writeOriginalBatch = batch.magic() >= RecordBatch.CURRENT_MAGIC_VALUE;
List<Record> retainedRecords = new ArrayList<>();
while (iterator.hasNext()) {
Record record = iterator.next();
filterResult.messagesRead += 1;

if (filter.shouldRetainRecord(batch, record)) {
// Check for log corruption due to KAFKA-4298. If we find it, make sure that we overwrite
// the corrupted batch with correct data.
if (!record.hasMagic(batchMagic))
writeOriginalBatch = false;

if (record.offset() > maxOffset)
maxOffset = record.offset();

Expand All @@ -272,17 +249,20 @@ private static BatchFilterResult filterBatch(RecordBatch batch,
writeOriginalBatch = false;
}
}
return new BatchFilterResult(writeOriginalBatch, containsTombstones, maxOffset);
return new BatchFilterResult(retainedRecords, writeOriginalBatch, containsTombstones, maxOffset);
}
}

private static class BatchFilterResult {
private final List<Record> retainedRecords;
private final boolean writeOriginalBatch;
private final boolean containsTombstones;
private final long maxOffset;
private BatchFilterResult(final boolean writeOriginalBatch,
final boolean containsTombstones,
final long maxOffset) {
private BatchFilterResult(List<Record> retainedRecords,
final boolean writeOriginalBatch,
final boolean containsTombstones,
final long maxOffset) {
this.retainedRecords = retainedRecords;
this.writeOriginalBatch = writeOriginalBatch;
this.containsTombstones = containsTombstones;
this.maxOffset = maxOffset;
Expand All @@ -293,23 +273,28 @@ private static MemoryRecordsBuilder buildRetainedRecordsInto(RecordBatch origina
List<Record> retainedRecords,
ByteBufferOutputStream bufferOutputStream,
final long deleteHorizonMs) {
byte magic = originalBatch.magic();
Compression compression = Compression.of(originalBatch.compressionType()).build();
TimestampType timestampType = originalBatch.timestampType();
// V0 has no timestamp type or timestamp, so we set the timestamp to CREATE_TIME and timestamp to NO_TIMESTAMP.
// Note that this differs from produce up-conversion where the timestamp type topic config is used and the log append
// time is generated if the config is LOG_APPEND_TIME. The reason for the different behavior is that there is
// no appropriate log append time we can generate at compaction time.
TimestampType timestampType = originalBatch.timestampType() == TimestampType.NO_TIMESTAMP_TYPE ?
TimestampType.CREATE_TIME : originalBatch.timestampType();
long logAppendTime = timestampType == TimestampType.LOG_APPEND_TIME ?
originalBatch.maxTimestamp() : RecordBatch.NO_TIMESTAMP;
long baseOffset = magic >= RecordBatch.MAGIC_VALUE_V2 ?
long baseOffset = originalBatch.magic() >= RecordBatch.MAGIC_VALUE_V2 ?
originalBatch.baseOffset() : retainedRecords.get(0).offset();

MemoryRecordsBuilder builder = new MemoryRecordsBuilder(bufferOutputStream, magic,
// Convert records with older record versions to the current one
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(bufferOutputStream, RecordBatch.CURRENT_MAGIC_VALUE,
compression, timestampType, baseOffset, logAppendTime, originalBatch.producerId(),
originalBatch.producerEpoch(), originalBatch.baseSequence(), originalBatch.isTransactional(),
originalBatch.isControlBatch(), originalBatch.partitionLeaderEpoch(), bufferOutputStream.limit(), deleteHorizonMs);

for (Record record : retainedRecords)
builder.append(record);

if (magic >= RecordBatch.MAGIC_VALUE_V2)
if (originalBatch.magic() >= RecordBatch.MAGIC_VALUE_V2)
// we must preserve the last offset from the initial batch in order to ensure that the
// last sequence number from the batch remains even after compaction. Otherwise, the producer
// could incorrectly see an out of sequence error.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2532,7 +2532,7 @@ public void testUpdatePositionWithLastRecordMissingFromBatch() {
new SimpleRecord(null, "value".getBytes()));

// Remove the last record to simulate compaction
MemoryRecords.FilterResult result = records.filterTo(tp0, new MemoryRecords.RecordFilter(0, 0) {
MemoryRecords.FilterResult result = records.filterTo(new MemoryRecords.RecordFilter(0, 0) {
@Override
protected BatchRetentionResult checkBatchRetention(RecordBatch batch) {
return new BatchRetentionResult(BatchRetention.DELETE_EMPTY, false);
Expand All @@ -2542,7 +2542,7 @@ protected BatchRetentionResult checkBatchRetention(RecordBatch batch) {
protected boolean shouldRetainRecord(RecordBatch recordBatch, Record record) {
return record.key() != null;
}
}, ByteBuffer.allocate(1024), Integer.MAX_VALUE, BufferSupplier.NO_CACHING);
}, ByteBuffer.allocate(1024), BufferSupplier.NO_CACHING);
result.outputBuffer().flip();
MemoryRecords compactedRecords = MemoryRecords.readableRecords(result.outputBuffer());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2518,7 +2518,7 @@ public void testUpdatePositionWithLastRecordMissingFromBatch() {
new SimpleRecord(null, "value".getBytes()));

// Remove the last record to simulate compaction
MemoryRecords.FilterResult result = records.filterTo(tp0, new MemoryRecords.RecordFilter(0, 0) {
MemoryRecords.FilterResult result = records.filterTo(new MemoryRecords.RecordFilter(0, 0) {
@Override
protected BatchRetentionResult checkBatchRetention(RecordBatch batch) {
return new BatchRetentionResult(BatchRetention.DELETE_EMPTY, false);
Expand All @@ -2528,7 +2528,7 @@ protected BatchRetentionResult checkBatchRetention(RecordBatch batch) {
protected boolean shouldRetainRecord(RecordBatch recordBatch, Record record) {
return record.key() != null;
}
}, ByteBuffer.allocate(1024), Integer.MAX_VALUE, BufferSupplier.NO_CACHING);
}, ByteBuffer.allocate(1024), BufferSupplier.NO_CACHING);
result.outputBuffer().flip();
MemoryRecords compactedRecords = MemoryRecords.readableRecords(result.outputBuffer());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1249,7 +1249,7 @@ public void testFetchWithLastRecordMissingFromBatch() {
new SimpleRecord(null, "value".getBytes()));

// Remove the last record to simulate compaction
MemoryRecords.FilterResult result = records.filterTo(tp0, new MemoryRecords.RecordFilter(0, 0) {
MemoryRecords.FilterResult result = records.filterTo(new MemoryRecords.RecordFilter(0, 0) {
@Override
protected BatchRetentionResult checkBatchRetention(RecordBatch batch) {
return new BatchRetentionResult(BatchRetention.DELETE_EMPTY, false);
Expand All @@ -1259,7 +1259,7 @@ protected BatchRetentionResult checkBatchRetention(RecordBatch batch) {
protected boolean shouldRetainRecord(RecordBatch recordBatch, Record record) {
return record.key() != null;
}
}, ByteBuffer.allocate(1024), Integer.MAX_VALUE, BufferSupplier.NO_CACHING);
}, ByteBuffer.allocate(1024), BufferSupplier.NO_CACHING);
result.outputBuffer().flip();
MemoryRecords compactedRecords = MemoryRecords.readableRecords(result.outputBuffer());

Expand Down
Loading