diff --git a/checkstyle/checkstyle.xml b/checkstyle/checkstyle.xml
index 13cfdb82bd0a8..5f8554e45560b 100644
--- a/checkstyle/checkstyle.xml
+++ b/checkstyle/checkstyle.xml
@@ -107,7 +107,7 @@
-
+
@@ -124,7 +124,7 @@
-
+
diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
index 83637640af49d..ea6c2b23bbad1 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
@@ -217,6 +217,16 @@ public boolean isControlBatch() {
return false;
}
+ @Override
+ public long deleteHorizonMs() {
+ return RecordBatch.NO_TIMESTAMP;
+ }
+
+ @Override
+ public boolean hasDeleteHorizonMs() {
+ return false;
+ }
+
/**
* Get an iterator for the nested entries contained within this batch. Note that
* if the batch is not compressed, then this method will return an iterator over the
@@ -468,6 +478,16 @@ public long offset() {
return buffer.getLong(OFFSET_OFFSET);
}
+ @Override
+ public long deleteHorizonMs() {
+ return RecordBatch.NO_TIMESTAMP;
+ }
+
+ @Override
+ public boolean hasDeleteHorizonMs() {
+ return false;
+ }
+
@Override
public LegacyRecord outerRecord() {
return record;
@@ -557,6 +577,16 @@ public long baseOffset() {
return loadFullBatch().baseOffset();
}
+ @Override
+ public long deleteHorizonMs() {
+ return RecordBatch.NO_TIMESTAMP;
+ }
+
+ @Override
+ public boolean hasDeleteHorizonMs() {
+ return false;
+ }
+
@Override
public long lastOffset() {
return offset;
diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
index d4a9587ffa56e..8fbdcc1ef4cce 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
@@ -89,11 +89,15 @@
* by the broker and is preserved after compaction. Additionally, the MaxTimestamp of an empty batch always retains
* the previous value prior to becoming empty.
*
+ * The delete horizon flag for the sixth bit is used to determine if the first timestamp of the batch had been set to
+ * the time for which tombstones / transaction markers needs to be removed. If it is true, then the first timestamp is
+ * the delete horizon, otherwise, it is merely the first timestamp of the record batch.
+ *
* The current attributes are given below:
*
- * -------------------------------------------------------------------------------------------------
- * | Unused (6-15) | Control (5) | Transactional (4) | Timestamp Type (3) | Compression Type (0-2) |
- * -------------------------------------------------------------------------------------------------
+ * ---------------------------------------------------------------------------------------------------------------------------
+ * | Unused (7-15) | Delete Horizon Flag (6) | Control (5) | Transactional (4) | Timestamp Type (3) | Compression Type (0-2) |
+ * ---------------------------------------------------------------------------------------------------------------------------
*/
public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRecordBatch {
static final int BASE_OFFSET_OFFSET = 0;
@@ -128,6 +132,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
private static final byte COMPRESSION_CODEC_MASK = 0x07;
private static final byte TRANSACTIONAL_FLAG_MASK = 0x10;
private static final int CONTROL_FLAG_MASK = 0x20;
+ private static final byte DELETE_HORIZON_FLAG_MASK = 0x40;
private static final byte TIMESTAMP_TYPE_MASK = 0x08;
private static final int MAX_SKIP_BUFFER_SIZE = 2048;
@@ -155,13 +160,27 @@ public void ensureValid() {
}
/**
- * Get the timestamp of the first record in this batch. It is always the create time of the record even if the
+ * Gets the base timestamp of the batch which is used to calculate the timestamp deltas.
+ *
+ * @return The base timestamp or
+ * {@link RecordBatch#NO_TIMESTAMP} if the batch is empty
+ */
+ public long baseTimestamp() {
+ return buffer.getLong(FIRST_TIMESTAMP_OFFSET);
+ }
+
+ /**
+ * Get the timestamp of the first record in this batch. It is usually the create time of the record even if the
* timestamp type of the batch is log append time.
- *
- * @return The first timestamp or {@link RecordBatch#NO_TIMESTAMP} if the batch is empty
+ *
+ * @return The first timestamp if a record has been appended, unless the delete horizon has been set
+ * {@link RecordBatch#NO_TIMESTAMP} if the batch is empty or if the delete horizon is set
*/
public long firstTimestamp() {
- return buffer.getLong(FIRST_TIMESTAMP_OFFSET);
+ final long baseTimestamp = baseTimestamp();
+ if (hasDeleteHorizonMs())
+ return RecordBatch.NO_TIMESTAMP;
+ return baseTimestamp;
}
@Override
@@ -245,6 +264,19 @@ public boolean isTransactional() {
return (attributes() & TRANSACTIONAL_FLAG_MASK) > 0;
}
+ @Override
+ public boolean hasDeleteHorizonMs() {
+ return (attributes() & DELETE_HORIZON_FLAG_MASK) > 0;
+ }
+
+ @Override
+ public long deleteHorizonMs() {
+ final long baseTimestamp = baseTimestamp();
+ if (hasDeleteHorizonMs())
+ return baseTimestamp;
+ return RecordBatch.NO_TIMESTAMP;
+ }
+
@Override
public boolean isControlBatch() {
return (attributes() & CONTROL_FLAG_MASK) > 0;
@@ -360,7 +392,7 @@ public void setMaxTimestamp(TimestampType timestampType, long maxTimestamp) {
if (timestampType() == timestampType && currentMaxTimestamp == maxTimestamp)
return;
- byte attributes = computeAttributes(compressionType(), timestampType, isTransactional(), isControlBatch());
+ byte attributes = computeAttributes(compressionType(), timestampType, isTransactional(), isControlBatch(), hasDeleteHorizonMs());
buffer.putShort(ATTRIBUTES_OFFSET, attributes);
buffer.putLong(MAX_TIMESTAMP_OFFSET, maxTimestamp);
long crc = computeChecksum();
@@ -407,7 +439,7 @@ public int hashCode() {
}
private static byte computeAttributes(CompressionType type, TimestampType timestampType,
- boolean isTransactional, boolean isControl) {
+ boolean isTransactional, boolean isControl, boolean isDeleteHorizonSet) {
if (timestampType == TimestampType.NO_TIMESTAMP_TYPE)
throw new IllegalArgumentException("Timestamp type must be provided to compute attributes for message " +
"format v2 and above");
@@ -419,6 +451,8 @@ private static byte computeAttributes(CompressionType type, TimestampType timest
attributes |= COMPRESSION_CODEC_MASK & type.id;
if (timestampType == TimestampType.LOG_APPEND_TIME)
attributes |= TIMESTAMP_TYPE_MASK;
+ if (isDeleteHorizonSet)
+ attributes |= DELETE_HORIZON_FLAG_MASK;
return attributes;
}
@@ -436,8 +470,8 @@ public static void writeEmptyHeader(ByteBuffer buffer,
boolean isControlRecord) {
int offsetDelta = (int) (lastOffset - baseOffset);
writeHeader(buffer, baseOffset, offsetDelta, DefaultRecordBatch.RECORD_BATCH_OVERHEAD, magic,
- CompressionType.NONE, timestampType, RecordBatch.NO_TIMESTAMP, timestamp, producerId,
- producerEpoch, baseSequence, isTransactional, isControlRecord, partitionLeaderEpoch, 0);
+ CompressionType.NONE, timestampType, RecordBatch.NO_TIMESTAMP, timestamp, producerId,
+ producerEpoch, baseSequence, isTransactional, isControlRecord, false, partitionLeaderEpoch, 0);
}
static void writeHeader(ByteBuffer buffer,
@@ -454,6 +488,7 @@ static void writeHeader(ByteBuffer buffer,
int sequence,
boolean isTransactional,
boolean isControlBatch,
+ boolean isDeleteHorizonSet,
int partitionLeaderEpoch,
int numRecords) {
if (magic < RecordBatch.CURRENT_MAGIC_VALUE)
@@ -461,7 +496,7 @@ static void writeHeader(ByteBuffer buffer,
if (firstTimestamp < 0 && firstTimestamp != NO_TIMESTAMP)
throw new IllegalArgumentException("Invalid message timestamp " + firstTimestamp);
- short attributes = computeAttributes(compressionType, timestampType, isTransactional, isControlBatch);
+ short attributes = computeAttributes(compressionType, timestampType, isTransactional, isControlBatch, isDeleteHorizonSet);
int position = buffer.position();
buffer.putLong(position + BASE_OFFSET_OFFSET, baseOffset);
@@ -699,6 +734,18 @@ public boolean isTransactional() {
return loadBatchHeader().isTransactional();
}
+ @Override
+ public boolean hasDeleteHorizonMs() {
+ return loadBatchHeader().hasDeleteHorizonMs();
+ }
+
+ @Override
+ public long deleteHorizonMs() {
+ if (hasDeleteHorizonMs())
+ return super.loadBatchHeader().deleteHorizonMs();
+ return RecordBatch.NO_TIMESTAMP;
+ }
+
@Override
public boolean isControlBatch() {
return loadBatchHeader().isControlBatch();
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index 8f73565d1b40f..53d38318c431a 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -19,6 +19,7 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.CorruptRecordException;
import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention;
+import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetentionResult;
import org.apache.kafka.common.utils.AbstractIterator;
import org.apache.kafka.common.utils.ByteBufferOutputStream;
import org.apache.kafka.common.utils.CloseableIterator;
@@ -150,15 +151,22 @@ public FilterResult filterTo(TopicPartition partition, RecordFilter filter, Byte
return filterTo(partition, batches(), filter, destinationBuffer, maxRecordBatchSize, decompressionBufferSupplier);
}
+ /**
+ * Note: This method is also used to convert the first timestamp of the batch (which is usually the timestamp of the first record)
+ * to the delete horizon of the tombstones or txn markers which are present in the batch.
+ */
private static FilterResult filterTo(TopicPartition partition, Iterable batches,
RecordFilter filter, ByteBuffer destinationBuffer, int maxRecordBatchSize,
BufferSupplier decompressionBufferSupplier) {
FilterResult filterResult = new FilterResult(destinationBuffer);
ByteBufferOutputStream bufferOutputStream = new ByteBufferOutputStream(destinationBuffer);
-
for (MutableRecordBatch batch : batches) {
long maxOffset = -1L;
- BatchRetention batchRetention = filter.checkBatchRetention(batch);
+
+ final BatchRetentionResult batchRetentionResult = filter.checkBatchRetention(batch);
+ final boolean containsMarkerForEmptyTxn = batchRetentionResult.containsMarkerForEmptyTxn;
+ final BatchRetention batchRetention = batchRetentionResult.batchRetention;
+
filterResult.bytesRead += batch.sizeInBytes();
if (batchRetention == BatchRetention.DELETE)
@@ -168,38 +176,40 @@ private static FilterResult filterTo(TopicPartition partition, Iterable retainedRecords = new ArrayList<>();
- try (final CloseableIterator iterator = batch.streamingIterator(decompressionBufferSupplier)) {
- while (iterator.hasNext()) {
- Record record = iterator.next();
- filterResult.messagesRead += 1;
-
- if (filter.shouldRetainRecord(batch, record)) {
- // Check for log corruption due to KAFKA-4298. If we find it, make sure that we overwrite
- // the corrupted batch with correct data.
- if (!record.hasMagic(batchMagic))
- writeOriginalBatch = false;
-
- if (record.offset() > maxOffset)
- maxOffset = record.offset();
-
- retainedRecords.add(record);
- } else {
- writeOriginalBatch = false;
- }
- }
- }
+ final BatchFilterResult iterationResult = filterBatch(batch, decompressionBufferSupplier, filterResult, filter,
+ batchMagic, true, maxOffset, retainedRecords);
+ boolean containsTombstones = iterationResult.containsTombstones();
+ boolean writeOriginalBatch = iterationResult.shouldWriteOriginalBatch();
+ maxOffset = iterationResult.maxOffset();
if (!retainedRecords.isEmpty()) {
- if (writeOriginalBatch) {
+ // we check if the delete horizon should be set to a new value
+ // in which case, we need to reset the base timestamp and overwrite the timestamp deltas
+ // if the batch does not contain tombstones, then we don't need to overwrite batch
+ boolean needToSetDeleteHorizon = batch.magic() >= 2 && (containsTombstones || containsMarkerForEmptyTxn)
+ && !batch.hasDeleteHorizonMs();
+ if (writeOriginalBatch && !needToSetDeleteHorizon) {
+ if (batch.deleteHorizonMs() > filterResult.latestDeleteHorizon())
+ filterResult.updateLatestDeleteHorizon(batch.deleteHorizonMs());
batch.writeTo(bufferOutputStream);
filterResult.updateRetainedBatchMetadata(batch, retainedRecords.size(), false);
} else {
- MemoryRecordsBuilder builder = buildRetainedRecordsInto(batch, retainedRecords, bufferOutputStream);
+ final MemoryRecordsBuilder builder;
+ if (needToSetDeleteHorizon) {
+ long deleteHorizonMs = filter.currentTime + filter.deleteRetentionMs;
+ builder = buildRetainedRecordsInto(batch, retainedRecords, bufferOutputStream, deleteHorizonMs);
+ if (deleteHorizonMs > filterResult.latestDeleteHorizon()) {
+ filterResult.updateLatestDeleteHorizon(deleteHorizonMs);
+ }
+ } else {
+ builder = buildRetainedRecordsInto(batch, retainedRecords, bufferOutputStream, batch.deleteHorizonMs());
+ if (batch.deleteHorizonMs() > filterResult.latestDeleteHorizon())
+ filterResult.updateLatestDeleteHorizon(batch.deleteHorizonMs());
+ }
+
MemoryRecords records = builder.build();
int filteredBatchSize = records.sizeInBytes();
if (filteredBatchSize > batch.sizeInBytes() && filteredBatchSize > maxRecordBatchSize)
@@ -236,9 +246,68 @@ private static FilterResult filterTo(TopicPartition partition, Iterable retainedRecords) {
+ boolean containsTombstones = false;
+ try (final CloseableIterator iterator = batch.streamingIterator(decompressionBufferSupplier)) {
+ while (iterator.hasNext()) {
+ Record record = iterator.next();
+ filterResult.messagesRead += 1;
+
+ if (filter.shouldRetainRecord(batch, record)) {
+ // Check for log corruption due to KAFKA-4298. If we find it, make sure that we overwrite
+ // the corrupted batch with correct data.
+ if (!record.hasMagic(batchMagic))
+ writeOriginalBatch = false;
+
+ if (record.offset() > maxOffset)
+ maxOffset = record.offset();
+
+ retainedRecords.add(record);
+
+ if (!record.hasValue()) {
+ containsTombstones = true;
+ }
+ } else {
+ writeOriginalBatch = false;
+ }
+ }
+ return new BatchFilterResult(writeOriginalBatch, containsTombstones, maxOffset);
+ }
+ }
+
+ private static class BatchFilterResult {
+ private final boolean writeOriginalBatch;
+ private final boolean containsTombstones;
+ private final long maxOffset;
+ public BatchFilterResult(final boolean writeOriginalBatch,
+ final boolean containsTombstones,
+ final long maxOffset) {
+ this.writeOriginalBatch = writeOriginalBatch;
+ this.containsTombstones = containsTombstones;
+ this.maxOffset = maxOffset;
+ }
+ public boolean shouldWriteOriginalBatch() {
+ return this.writeOriginalBatch;
+ }
+ public boolean containsTombstones() {
+ return this.containsTombstones;
+ }
+ public long maxOffset() {
+ return this.maxOffset;
+ }
+ }
+
private static MemoryRecordsBuilder buildRetainedRecordsInto(RecordBatch originalBatch,
List retainedRecords,
- ByteBufferOutputStream bufferOutputStream) {
+ ByteBufferOutputStream bufferOutputStream,
+ final long deleteHorizonMs) {
byte magic = originalBatch.magic();
TimestampType timestampType = originalBatch.timestampType();
long logAppendTime = timestampType == TimestampType.LOG_APPEND_TIME ?
@@ -249,7 +318,7 @@ private static MemoryRecordsBuilder buildRetainedRecordsInto(RecordBatch origina
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(bufferOutputStream, magic,
originalBatch.compressionType(), timestampType, baseOffset, logAppendTime, originalBatch.producerId(),
originalBatch.producerEpoch(), originalBatch.baseSequence(), originalBatch.isTransactional(),
- originalBatch.isControlBatch(), originalBatch.partitionLeaderEpoch(), bufferOutputStream.limit());
+ originalBatch.isControlBatch(), originalBatch.partitionLeaderEpoch(), bufferOutputStream.limit(), deleteHorizonMs);
for (Record record : retainedRecords)
builder.append(record);
@@ -300,6 +369,24 @@ public int hashCode() {
}
public static abstract class RecordFilter {
+ public final long currentTime;
+ public final long deleteRetentionMs;
+
+ public RecordFilter(final long currentTime, final long deleteRetentionMs) {
+ this.currentTime = currentTime;
+ this.deleteRetentionMs = deleteRetentionMs;
+ }
+
+ public static class BatchRetentionResult {
+ public final BatchRetention batchRetention;
+ public final boolean containsMarkerForEmptyTxn;
+ public BatchRetentionResult(final BatchRetention batchRetention,
+ final boolean containsMarkerForEmptyTxn) {
+ this.batchRetention = batchRetention;
+ this.containsMarkerForEmptyTxn = containsMarkerForEmptyTxn;
+ }
+ }
+
public enum BatchRetention {
DELETE, // Delete the batch without inspecting records
RETAIN_EMPTY, // Retain the batch even if it is empty
@@ -310,7 +397,7 @@ public enum BatchRetention {
* Check whether the full batch can be discarded (i.e. whether we even need to
* check the records individually).
*/
- protected abstract BatchRetention checkBatchRetention(RecordBatch batch);
+ protected abstract BatchRetentionResult checkBatchRetention(RecordBatch batch);
/**
* Check whether a record should be retained in the log. Note that {@link #checkBatchRetention(RecordBatch)}
@@ -331,11 +418,20 @@ public static class FilterResult {
private long maxOffset = -1L;
private long maxTimestamp = RecordBatch.NO_TIMESTAMP;
private long shallowOffsetOfMaxTimestamp = -1L;
+ private long latestDeleteHorizonMs = RecordBatch.NO_TIMESTAMP;
private FilterResult(ByteBuffer outputBuffer) {
this.outputBuffer = outputBuffer;
}
+ public void updateLatestDeleteHorizon(long deleteHorizon) {
+ this.latestDeleteHorizonMs = deleteHorizon;
+ }
+
+ public long latestDeleteHorizon() {
+ return latestDeleteHorizonMs;
+ }
+
private void updateRetainedBatchMetadata(MutableRecordBatch retainedBatch, int numMessagesInBatch, boolean headerOnly) {
int bytesRetained = headerOnly ? DefaultRecordBatch.RECORD_BATCH_OVERHEAD : retainedBatch.sizeInBytes();
updateRetainedBatchMetadata(retainedBatch.maxTimestamp(), retainedBatch.lastOffset(),
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index 054fb86199884..66dc654f0426d 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -75,6 +75,7 @@ public void write(int b) {
private int numRecords = 0;
private float actualCompressionRatio = 1;
private long maxTimestamp = RecordBatch.NO_TIMESTAMP;
+ private long deleteHorizonMs;
private long offsetOfMaxTimestamp = -1;
private Long lastOffset = null;
private Long firstTimestamp = null;
@@ -94,7 +95,8 @@ public MemoryRecordsBuilder(ByteBufferOutputStream bufferStream,
boolean isTransactional,
boolean isControlBatch,
int partitionLeaderEpoch,
- int writeLimit) {
+ int writeLimit,
+ long deleteHorizonMs) {
if (magic > RecordBatch.MAGIC_VALUE_V0 && timestampType == TimestampType.NO_TIMESTAMP_TYPE)
throw new IllegalArgumentException("TimestampType must be set for magic >= 0");
if (magic < RecordBatch.MAGIC_VALUE_V2) {
@@ -120,6 +122,7 @@ public MemoryRecordsBuilder(ByteBufferOutputStream bufferStream,
this.baseSequence = baseSequence;
this.isTransactional = isTransactional;
this.isControlBatch = isControlBatch;
+ this.deleteHorizonMs = deleteHorizonMs;
this.partitionLeaderEpoch = partitionLeaderEpoch;
this.writeLimit = writeLimit;
this.initialPosition = bufferStream.position();
@@ -128,6 +131,28 @@ public MemoryRecordsBuilder(ByteBufferOutputStream bufferStream,
bufferStream.position(initialPosition + batchHeaderSizeInBytes);
this.bufferStream = bufferStream;
this.appendStream = new DataOutputStream(compressionType.wrapForOutput(this.bufferStream, magic));
+
+ if (hasDeleteHorizonMs()) {
+ this.firstTimestamp = deleteHorizonMs;
+ }
+ }
+
+ public MemoryRecordsBuilder(ByteBufferOutputStream bufferStream,
+ byte magic,
+ CompressionType compressionType,
+ TimestampType timestampType,
+ long baseOffset,
+ long logAppendTime,
+ long producerId,
+ short producerEpoch,
+ int baseSequence,
+ boolean isTransactional,
+ boolean isControlBatch,
+ int partitionLeaderEpoch,
+ int writeLimit) {
+ this(bufferStream, magic, compressionType, timestampType, baseOffset, logAppendTime, producerId,
+ producerEpoch, baseSequence, isTransactional, isControlBatch, partitionLeaderEpoch, writeLimit,
+ RecordBatch.NO_TIMESTAMP);
}
/**
@@ -192,6 +217,10 @@ public boolean isTransactional() {
return isTransactional;
}
+ public boolean hasDeleteHorizonMs() {
+ return magic >= RecordBatch.MAGIC_VALUE_V2 && deleteHorizonMs >= 0L;
+ }
+
/**
* Close this builder and return the resulting buffer.
* @return The built log buffer
@@ -365,7 +394,7 @@ private int writeDefaultBatchHeader() {
DefaultRecordBatch.writeHeader(buffer, baseOffset, offsetDelta, size, magic, compressionType, timestampType,
firstTimestamp, maxTimestamp, producerId, producerEpoch, baseSequence, isTransactional, isControlBatch,
- partitionLeaderEpoch, numRecords);
+ hasDeleteHorizonMs(), partitionLeaderEpoch, numRecords);
buffer.position(pos);
return writtenCompressed;
diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
index 65a6a95fbe41f..af65ebaf9ad18 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
@@ -210,6 +210,18 @@ public interface RecordBatch extends Iterable {
*/
boolean isTransactional();
+ /**
+ * Whether or not the base timestamp has been set to the delete horizon
+ * @return true if it is, false otherwise
+ */
+ boolean hasDeleteHorizonMs();
+
+ /**
+ * Get the delete horizon, returns -1L if the first timestamp is not the delete horizon
+ * @return timestamp of the delete horizon
+ */
+ long deleteHorizonMs();
+
/**
* Get the partition leader epoch of this record batch.
* @return The leader epoch or -1 if it is unknown
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 041b8191f3f78..74409802118f3 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -2895,10 +2895,10 @@ public void testUpdatePositionWithLastRecordMissingFromBatch() {
new SimpleRecord(null, "value".getBytes()));
// Remove the last record to simulate compaction
- MemoryRecords.FilterResult result = records.filterTo(tp0, new MemoryRecords.RecordFilter() {
+ MemoryRecords.FilterResult result = records.filterTo(tp0, new MemoryRecords.RecordFilter(0, 0) {
@Override
- protected BatchRetention checkBatchRetention(RecordBatch batch) {
- return BatchRetention.DELETE_EMPTY;
+ protected BatchRetentionResult checkBatchRetention(RecordBatch batch) {
+ return new BatchRetentionResult(BatchRetention.DELETE_EMPTY, false);
}
@Override
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
index b8824d3a8276c..db6cb67e1d777 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
@@ -19,6 +19,7 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.CorruptRecordException;
import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.MemoryRecords.RecordFilter;
import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestUtils;
@@ -247,6 +248,41 @@ public void testFilterToPreservesPartitionLeaderEpoch() {
}
}
+ /**
+ * This test is used to see if the first timestamp of the batch has been successfully
+ * converted to a delete horizon for the tombstones / transaction markers of the batch.
+ */
+ @Test
+ public void testFirstTimestampToDeleteHorizonConversion() {
+ if (magic >= RecordBatch.MAGIC_VALUE_V2) {
+ ByteBuffer buffer = ByteBuffer.allocate(2048);
+ MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME,
+ 0L, RecordBatch.NO_TIMESTAMP, partitionLeaderEpoch);
+ builder.append(10L, "1".getBytes(), null);
+
+ ByteBuffer filtered = ByteBuffer.allocate(2048);
+ final long deleteHorizon = Integer.MAX_VALUE / 2;
+ final RecordFilter recordFilter = new MemoryRecords.RecordFilter(deleteHorizon - 1, 1) {
+ @Override
+ protected boolean shouldRetainRecord(RecordBatch recordBatch, Record record) {
+ return true;
+ }
+
+ @Override
+ protected BatchRetentionResult checkBatchRetention(RecordBatch batch) {
+ return new BatchRetentionResult(BatchRetention.RETAIN_EMPTY, true);
+ }
+ };
+ builder.build().filterTo(new TopicPartition("random", 0), recordFilter, filtered, Integer.MAX_VALUE, BufferSupplier.NO_CACHING);
+ filtered.flip();
+ MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered);
+
+ List batches = TestUtils.toList(filteredRecords.batches());
+ assertEquals(1, batches.size());
+ assertEquals(deleteHorizon, batches.get(0).deleteHorizonMs());
+ }
+ }
+
@Test
public void testFilterToEmptyBatchRetention() {
if (magic >= RecordBatch.MAGIC_VALUE_V2) {
@@ -269,11 +305,11 @@ public void testFilterToEmptyBatchRetention() {
ByteBuffer filtered = ByteBuffer.allocate(2048);
MemoryRecords.FilterResult filterResult = records.filterTo(new TopicPartition("foo", 0),
- new MemoryRecords.RecordFilter() {
+ new MemoryRecords.RecordFilter(0, 0) {
@Override
- protected BatchRetention checkBatchRetention(RecordBatch batch) {
+ protected BatchRetentionResult checkBatchRetention(RecordBatch batch) {
// retain all batches
- return BatchRetention.RETAIN_EMPTY;
+ return new BatchRetentionResult(BatchRetention.RETAIN_EMPTY, false);
}
@Override
@@ -331,11 +367,11 @@ public void testEmptyBatchRetention() {
ByteBuffer filtered = ByteBuffer.allocate(2048);
MemoryRecords records = MemoryRecords.readableRecords(buffer);
MemoryRecords.FilterResult filterResult = records.filterTo(new TopicPartition("foo", 0),
- new MemoryRecords.RecordFilter() {
+ new MemoryRecords.RecordFilter(0, 0) {
@Override
- protected BatchRetention checkBatchRetention(RecordBatch batch) {
+ protected BatchRetentionResult checkBatchRetention(RecordBatch batch) {
// retain all batches
- return BatchRetention.RETAIN_EMPTY;
+ return new BatchRetentionResult(BatchRetention.RETAIN_EMPTY, false);
}
@Override
@@ -381,10 +417,10 @@ public void testEmptyBatchDeletion() {
ByteBuffer filtered = ByteBuffer.allocate(2048);
MemoryRecords records = MemoryRecords.readableRecords(buffer);
MemoryRecords.FilterResult filterResult = records.filterTo(new TopicPartition("foo", 0),
- new MemoryRecords.RecordFilter() {
+ new MemoryRecords.RecordFilter(0, 0) {
@Override
- protected BatchRetention checkBatchRetention(RecordBatch batch) {
- return deleteRetention;
+ protected BatchRetentionResult checkBatchRetention(RecordBatch batch) {
+ return new BatchRetentionResult(deleteRetention, false);
}
@Override
@@ -469,13 +505,13 @@ public void testFilterToBatchDiscard() {
buffer.flip();
ByteBuffer filtered = ByteBuffer.allocate(2048);
- MemoryRecords.readableRecords(buffer).filterTo(new TopicPartition("foo", 0), new MemoryRecords.RecordFilter() {
+ MemoryRecords.readableRecords(buffer).filterTo(new TopicPartition("foo", 0), new MemoryRecords.RecordFilter(0, 0) {
@Override
- protected BatchRetention checkBatchRetention(RecordBatch batch) {
+ protected BatchRetentionResult checkBatchRetention(RecordBatch batch) {
// discard the second and fourth batches
if (batch.lastOffset() == 2L || batch.lastOffset() == 6L)
- return BatchRetention.DELETE;
- return BatchRetention.DELETE_EMPTY;
+ return new BatchRetentionResult(BatchRetention.DELETE, false);
+ return new BatchRetentionResult(BatchRetention.DELETE_EMPTY, false);
}
@Override
@@ -909,9 +945,13 @@ public static Collection