diff --git a/checkstyle/checkstyle.xml b/checkstyle/checkstyle.xml
index 13cfdb82bd0a8..8b28fcadb17c3 100644
--- a/checkstyle/checkstyle.xml
+++ b/checkstyle/checkstyle.xml
@@ -107,7 +107,7 @@
-
+
@@ -124,7 +124,7 @@
-
+
diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
index 83637640af49d..cf74f24f490f0 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
@@ -431,6 +431,16 @@ public LegacyRecord outerRecord() {
return record;
}
+ @Override
+ public long deleteHorizonMs() {
+ return RecordBatch.NO_TIMESTAMP;
+ }
+
+ @Override
+ public boolean deleteHorizonSet() {
+ return false;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o)
@@ -468,6 +478,16 @@ public long offset() {
return buffer.getLong(OFFSET_OFFSET);
}
+ @Override
+ public long deleteHorizonMs() {
+ return RecordBatch.NO_TIMESTAMP;
+ }
+
+ @Override
+ public boolean deleteHorizonSet() {
+ return false;
+ }
+
@Override
public LegacyRecord outerRecord() {
return record;
@@ -557,6 +577,16 @@ public long baseOffset() {
return loadFullBatch().baseOffset();
}
+ @Override
+ public long deleteHorizonMs() {
+ return RecordBatch.NO_TIMESTAMP;
+ }
+
+ @Override
+ public boolean deleteHorizonSet() {
+ return false;
+ }
+
@Override
public long lastOffset() {
return offset;
diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
index 6d79b268575ab..e0d7549fedebc 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
@@ -89,11 +89,15 @@
* by the broker and is preserved after compaction. Additionally, the MaxTimestamp of an empty batch always retains
* the previous value prior to becoming empty.
*
+ * The delete horizon flag for the sixth bit is used to determine if the first timestamp of the batch had been set to
+ * the time for which tombstones / transaction markers needs to be removed. If it is true, then the first timestamp is
+ * the delete horizon, otherwise, it is merely the first timestamp of the record batch.
+ *
* The current attributes are given below:
*
- * -------------------------------------------------------------------------------------------------
- * | Unused (6-15) | Control (5) | Transactional (4) | Timestamp Type (3) | Compression Type (0-2) |
- * -------------------------------------------------------------------------------------------------
+ * ---------------------------------------------------------------------------------------------------------------------------
+ * | Unused (7-15) | Delete Horizon Flag (6) | Control (5) | Transactional (4) | Timestamp Type (3) | Compression Type (0-2) |
+ * ---------------------------------------------------------------------------------------------------------------------------
*/
public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRecordBatch {
static final int BASE_OFFSET_OFFSET = 0;
@@ -128,6 +132,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
private static final byte COMPRESSION_CODEC_MASK = 0x07;
private static final byte TRANSACTIONAL_FLAG_MASK = 0x10;
private static final int CONTROL_FLAG_MASK = 0x20;
+ private static final byte DELETE_HORIZON_FLAG_MASK = 0x40;
private static final byte TIMESTAMP_TYPE_MASK = 0x08;
private static final int MAX_SKIP_BUFFER_SIZE = 2048;
@@ -155,10 +160,15 @@ public void ensureValid() {
}
/**
- * Get the timestamp of the first record in this batch. It is always the create time of the record even if the
+ * Get the timestamp of the first record in this batch. It is usually the create time of the record even if the
* timestamp type of the batch is log append time.
*
- * @return The first timestamp or {@link RecordBatch#NO_TIMESTAMP} if the batch is empty
+ * There is the possibility that the first timestamp had been set to the delete horizon of the batch,
+ * in which case, the delete horizon will be returned instead.
+ *
+ * @return The first timestamp if the batch's delete horizon has not been set
+ * The delete horizon if the batch's delete horizon has been set
+ * {@link RecordBatch#NO_TIMESTAMP} if the batch is empty
*/
public long firstTimestamp() {
return buffer.getLong(FIRST_TIMESTAMP_OFFSET);
@@ -245,6 +255,18 @@ public boolean isTransactional() {
return (attributes() & TRANSACTIONAL_FLAG_MASK) > 0;
}
+ @Override
+ public boolean deleteHorizonSet() {
+ return (attributes() & DELETE_HORIZON_FLAG_MASK) > 0;
+ }
+
+ @Override
+ public long deleteHorizonMs() {
+ if (deleteHorizonSet())
+ return firstTimestamp();
+ return RecordBatch.NO_TIMESTAMP;
+ }
+
@Override
public boolean isControlBatch() {
return (attributes() & CONTROL_FLAG_MASK) > 0;
@@ -360,7 +382,7 @@ public void setMaxTimestamp(TimestampType timestampType, long maxTimestamp) {
if (timestampType() == timestampType && currentMaxTimestamp == maxTimestamp)
return;
- byte attributes = computeAttributes(compressionType(), timestampType, isTransactional(), isControlBatch());
+ byte attributes = computeAttributes(compressionType(), timestampType, isTransactional(), isControlBatch(), deleteHorizonSet());
buffer.putShort(ATTRIBUTES_OFFSET, attributes);
buffer.putLong(MAX_TIMESTAMP_OFFSET, maxTimestamp);
long crc = computeChecksum();
@@ -407,7 +429,7 @@ public int hashCode() {
}
private static byte computeAttributes(CompressionType type, TimestampType timestampType,
- boolean isTransactional, boolean isControl) {
+ boolean isTransactional, boolean isControl, boolean isDeleteHorizonSet) {
if (timestampType == TimestampType.NO_TIMESTAMP_TYPE)
throw new IllegalArgumentException("Timestamp type must be provided to compute attributes for message " +
"format v2 and above");
@@ -419,6 +441,8 @@ private static byte computeAttributes(CompressionType type, TimestampType timest
attributes |= COMPRESSION_CODEC_MASK & type.id;
if (timestampType == TimestampType.LOG_APPEND_TIME)
attributes |= TIMESTAMP_TYPE_MASK;
+ if (isDeleteHorizonSet)
+ attributes |= DELETE_HORIZON_FLAG_MASK;
return attributes;
}
@@ -435,9 +459,49 @@ public static void writeEmptyHeader(ByteBuffer buffer,
boolean isTransactional,
boolean isControlRecord) {
int offsetDelta = (int) (lastOffset - baseOffset);
+ writeHeader(buffer, baseOffset, offsetDelta, DefaultRecordBatch.RECORD_BATCH_OVERHEAD, magic,
+ CompressionType.NONE, timestampType, RecordBatch.NO_TIMESTAMP, timestamp, producerId,
+ producerEpoch, baseSequence, isTransactional, isControlRecord, false, partitionLeaderEpoch, 0);
+ }
+
+ public static void writeEmptyHeader(ByteBuffer buffer,
+ byte magic,
+ long producerId,
+ short producerEpoch,
+ int baseSequence,
+ long baseOffset,
+ long lastOffset,
+ int partitionLeaderEpoch,
+ TimestampType timestampType,
+ long timestamp,
+ boolean isTransactional,
+ boolean isControlRecord,
+ boolean isDeleteHorizonSet) {
+ int offsetDelta = (int) (lastOffset - baseOffset);
writeHeader(buffer, baseOffset, offsetDelta, DefaultRecordBatch.RECORD_BATCH_OVERHEAD, magic,
CompressionType.NONE, timestampType, RecordBatch.NO_TIMESTAMP, timestamp, producerId,
- producerEpoch, baseSequence, isTransactional, isControlRecord, partitionLeaderEpoch, 0);
+ producerEpoch, baseSequence, isTransactional, isControlRecord, isDeleteHorizonSet, partitionLeaderEpoch, 0);
+ }
+
+ static void writeHeader(ByteBuffer buffer,
+ long baseOffset,
+ int lastOffsetDelta,
+ int sizeInBytes,
+ byte magic,
+ CompressionType compressionType,
+ TimestampType timestampType,
+ long firstTimestamp,
+ long maxTimestamp,
+ long producerId,
+ short epoch,
+ int sequence,
+ boolean isTransactional,
+ boolean isControlBatch,
+ int partitionLeaderEpoch,
+ int numRecords) {
+ writeHeader(buffer, baseOffset, lastOffsetDelta, sizeInBytes, magic, compressionType,
+ timestampType, firstTimestamp, maxTimestamp, producerId, epoch, sequence,
+ isTransactional, isControlBatch, false, partitionLeaderEpoch, numRecords);
}
static void writeHeader(ByteBuffer buffer,
@@ -454,6 +518,7 @@ static void writeHeader(ByteBuffer buffer,
int sequence,
boolean isTransactional,
boolean isControlBatch,
+ boolean isDeleteHorizonSet,
int partitionLeaderEpoch,
int numRecords) {
if (magic < RecordBatch.CURRENT_MAGIC_VALUE)
@@ -461,7 +526,7 @@ static void writeHeader(ByteBuffer buffer,
if (firstTimestamp < 0 && firstTimestamp != NO_TIMESTAMP)
throw new IllegalArgumentException("Invalid message timestamp " + firstTimestamp);
- short attributes = computeAttributes(compressionType, timestampType, isTransactional, isControlBatch);
+ short attributes = computeAttributes(compressionType, timestampType, isTransactional, isControlBatch, isDeleteHorizonSet);
int position = buffer.position();
buffer.putLong(position + BASE_OFFSET_OFFSET, baseOffset);
@@ -699,6 +764,18 @@ public boolean isTransactional() {
return loadBatchHeader().isTransactional();
}
+ @Override
+ public boolean deleteHorizonSet() {
+ return loadBatchHeader().deleteHorizonSet();
+ }
+
+ @Override
+ public long deleteHorizonMs() {
+ if (deleteHorizonSet())
+ return super.loadBatchHeader().deleteHorizonMs();
+ return RecordBatch.NO_TIMESTAMP;
+ }
+
@Override
public boolean isControlBatch() {
return loadBatchHeader().isControlBatch();
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index 8f73565d1b40f..7fb5fe951b9f0 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -150,15 +150,29 @@ public FilterResult filterTo(TopicPartition partition, RecordFilter filter, Byte
return filterTo(partition, batches(), filter, destinationBuffer, maxRecordBatchSize, decompressionBufferSupplier);
}
+ /**
+ * Note: This method is also used to convert the first timestamp of the batch (which is usually the timestamp of the first record)
+ * to the delete horizon of the tombstones which are present in the batch.
+ */
private static FilterResult filterTo(TopicPartition partition, Iterable batches,
RecordFilter filter, ByteBuffer destinationBuffer, int maxRecordBatchSize,
BufferSupplier decompressionBufferSupplier) {
FilterResult filterResult = new FilterResult(destinationBuffer);
ByteBufferOutputStream bufferOutputStream = new ByteBufferOutputStream(destinationBuffer);
-
for (MutableRecordBatch batch : batches) {
long maxOffset = -1L;
- BatchRetention batchRetention = filter.checkBatchRetention(batch);
+ // we first call this method here so that the flag in LogCleaner has been set
+ // which indicates if the control batch is empty or not
+ // we do this to avoid calling CleanedTransactionMetadata#onControlBatchRead
+ // more than once since each call is relatively expensive
+ filter.isControlBatchEmpty(batch);
+ long deleteHorizonMs = filter.retrieveDeleteHorizon(batch);
+ final BatchRetention batchRetention;
+ if (!batch.deleteHorizonSet())
+ batchRetention = filter.checkBatchRetention(batch, deleteHorizonMs);
+ else
+ batchRetention = filter.checkBatchRetention(batch);
+
filterResult.bytesRead += batch.sizeInBytes();
if (batchRetention == BatchRetention.DELETE)
@@ -170,36 +184,27 @@ private static FilterResult filterTo(TopicPartition partition, Iterable retainedRecords = new ArrayList<>();
- try (final CloseableIterator iterator = batch.streamingIterator(decompressionBufferSupplier)) {
- while (iterator.hasNext()) {
- Record record = iterator.next();
- filterResult.messagesRead += 1;
-
- if (filter.shouldRetainRecord(batch, record)) {
- // Check for log corruption due to KAFKA-4298. If we find it, make sure that we overwrite
- // the corrupted batch with correct data.
- if (!record.hasMagic(batchMagic))
- writeOriginalBatch = false;
-
- if (record.offset() > maxOffset)
- maxOffset = record.offset();
-
- retainedRecords.add(record);
- } else {
- writeOriginalBatch = false;
- }
- }
- }
+ final BatchIterationResult iterationResult = iterateOverBatch(batch, decompressionBufferSupplier, filterResult, filter,
+ batchMagic, writeOriginalBatch, maxOffset, retainedRecords,
+ containsTombstonesOrMarker, deleteHorizonMs);
+ containsTombstonesOrMarker = iterationResult.containsTombstonesOrMarker();
+ writeOriginalBatch = iterationResult.shouldWriteOriginalBatch();
+ maxOffset = iterationResult.maxOffset();
if (!retainedRecords.isEmpty()) {
- if (writeOriginalBatch) {
+ // we check if the delete horizon should be set to a new value
+ // in which case, we need to reset the base timestamp and overwrite the timestamp deltas
+ // if the batch does not contain tombstones, then we don't need to overwrite batch
+ if (writeOriginalBatch && (deleteHorizonMs == RecordBatch.NO_TIMESTAMP || deleteHorizonMs == batch.deleteHorizonMs() || !containsTombstonesOrMarker)) {
batch.writeTo(bufferOutputStream);
filterResult.updateRetainedBatchMetadata(batch, retainedRecords.size(), false);
} else {
- MemoryRecordsBuilder builder = buildRetainedRecordsInto(batch, retainedRecords, bufferOutputStream);
+ MemoryRecordsBuilder builder = buildRetainedRecordsInto(batch, retainedRecords, bufferOutputStream, deleteHorizonMs);
MemoryRecords records = builder.build();
int filteredBatchSize = records.sizeInBytes();
if (filteredBatchSize > batch.sizeInBytes() && filteredBatchSize > maxRecordBatchSize)
@@ -236,9 +241,69 @@ private static FilterResult filterTo(TopicPartition partition, Iterable retainedRecords,
+ boolean containsTombstonesOrMarker,
+ long newBatchDeleteHorizonMs) {
+ try (final CloseableIterator iterator = batch.streamingIterator(decompressionBufferSupplier)) {
+ while (iterator.hasNext()) {
+ Record record = iterator.next();
+ filterResult.messagesRead += 1;
+
+ if (filter.shouldRetainRecord(batch, record, newBatchDeleteHorizonMs)) {
+ // Check for log corruption due to KAFKA-4298. If we find it, make sure that we overwrite
+ // the corrupted batch with correct data.
+ if (!record.hasMagic(batchMagic))
+ writeOriginalBatch = false;
+
+ if (record.offset() > maxOffset)
+ maxOffset = record.offset();
+
+ retainedRecords.add(record);
+
+ if (!record.hasValue()) {
+ containsTombstonesOrMarker = true;
+ }
+ } else {
+ writeOriginalBatch = false;
+ }
+ }
+ return new BatchIterationResult(writeOriginalBatch, containsTombstonesOrMarker, maxOffset);
+ }
+ }
+
+ private static class BatchIterationResult {
+ private final boolean writeOriginalBatch;
+ private final boolean containsTombstonesOrMarker;
+ private final long maxOffset;
+ public BatchIterationResult(final boolean writeOriginalBatch,
+ final boolean containsTombstonesOrMarker,
+ final long maxOffset) {
+ this.writeOriginalBatch = writeOriginalBatch;
+ this.containsTombstonesOrMarker = containsTombstonesOrMarker;
+ this.maxOffset = maxOffset;
+ }
+ public boolean shouldWriteOriginalBatch() {
+ return this.writeOriginalBatch;
+ }
+ public boolean containsTombstonesOrMarker() {
+ return this.containsTombstonesOrMarker;
+ }
+ public long maxOffset() {
+ return this.maxOffset;
+ }
+ }
+
private static MemoryRecordsBuilder buildRetainedRecordsInto(RecordBatch originalBatch,
List retainedRecords,
- ByteBufferOutputStream bufferOutputStream) {
+ ByteBufferOutputStream bufferOutputStream,
+ final long deleteHorizonMs) {
byte magic = originalBatch.magic();
TimestampType timestampType = originalBatch.timestampType();
long logAppendTime = timestampType == TimestampType.LOG_APPEND_TIME ?
@@ -249,7 +314,7 @@ private static MemoryRecordsBuilder buildRetainedRecordsInto(RecordBatch origina
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(bufferOutputStream, magic,
originalBatch.compressionType(), timestampType, baseOffset, logAppendTime, originalBatch.producerId(),
originalBatch.producerEpoch(), originalBatch.baseSequence(), originalBatch.isTransactional(),
- originalBatch.isControlBatch(), originalBatch.partitionLeaderEpoch(), bufferOutputStream.limit());
+ originalBatch.isControlBatch(), originalBatch.partitionLeaderEpoch(), bufferOutputStream.limit(), deleteHorizonMs);
for (Record record : retainedRecords)
builder.append(record);
@@ -312,12 +377,34 @@ public enum BatchRetention {
*/
protected abstract BatchRetention checkBatchRetention(RecordBatch batch);
+ protected BatchRetention checkBatchRetention(RecordBatch batch, long newBatchDeleteHorizonMs) {
+ return checkBatchRetention(batch);
+ }
+
/**
* Check whether a record should be retained in the log. Note that {@link #checkBatchRetention(RecordBatch)}
* is used prior to checking individual record retention. Only records from batches which were not
* explicitly discarded with {@link BatchRetention#DELETE} will be considered.
*/
protected abstract boolean shouldRetainRecord(RecordBatch recordBatch, Record record);
+
+ protected boolean shouldRetainRecord(RecordBatch recordBatch, Record record, long newDeleteHorizonMs) {
+ return shouldRetainRecord(recordBatch, record);
+ }
+
+ /**
+ * Retrieves the delete horizon ms for a specific batch
+ */
+ protected long retrieveDeleteHorizon(RecordBatch recordBatch) {
+ return -1L;
+ }
+
+ /**
+ * Checks if the control batch (if it is one) can be removed (making sure that it is empty)
+ */
+ protected boolean isControlBatchEmpty(RecordBatch recordBatch) {
+ return true;
+ }
}
public static class FilterResult {
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index 054fb86199884..be32a83905935 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -75,6 +75,7 @@ public void write(int b) {
private int numRecords = 0;
private float actualCompressionRatio = 1;
private long maxTimestamp = RecordBatch.NO_TIMESTAMP;
+ private long deleteHorizonMs;
private long offsetOfMaxTimestamp = -1;
private Long lastOffset = null;
private Long firstTimestamp = null;
@@ -94,7 +95,8 @@ public MemoryRecordsBuilder(ByteBufferOutputStream bufferStream,
boolean isTransactional,
boolean isControlBatch,
int partitionLeaderEpoch,
- int writeLimit) {
+ int writeLimit,
+ long deleteHorizonMs) {
if (magic > RecordBatch.MAGIC_VALUE_V0 && timestampType == TimestampType.NO_TIMESTAMP_TYPE)
throw new IllegalArgumentException("TimestampType must be set for magic >= 0");
if (magic < RecordBatch.MAGIC_VALUE_V2) {
@@ -120,6 +122,7 @@ public MemoryRecordsBuilder(ByteBufferOutputStream bufferStream,
this.baseSequence = baseSequence;
this.isTransactional = isTransactional;
this.isControlBatch = isControlBatch;
+ this.deleteHorizonMs = deleteHorizonMs;
this.partitionLeaderEpoch = partitionLeaderEpoch;
this.writeLimit = writeLimit;
this.initialPosition = bufferStream.position();
@@ -130,6 +133,24 @@ public MemoryRecordsBuilder(ByteBufferOutputStream bufferStream,
this.appendStream = new DataOutputStream(compressionType.wrapForOutput(this.bufferStream, magic));
}
+ public MemoryRecordsBuilder(ByteBufferOutputStream bufferStream,
+ byte magic,
+ CompressionType compressionType,
+ TimestampType timestampType,
+ long baseOffset,
+ long logAppendTime,
+ long producerId,
+ short producerEpoch,
+ int baseSequence,
+ boolean isTransactional,
+ boolean isControlBatch,
+ int partitionLeaderEpoch,
+ int writeLimit) {
+ this(bufferStream, magic, compressionType, timestampType, baseOffset, logAppendTime, producerId,
+ producerEpoch, baseSequence, isTransactional, isControlBatch, partitionLeaderEpoch, writeLimit,
+ RecordBatch.NO_TIMESTAMP);
+ }
+
/**
* Construct a new builder.
*
@@ -192,6 +213,10 @@ public boolean isTransactional() {
return isTransactional;
}
+ public boolean deleteHorizonSet() {
+ return deleteHorizonMs >= 0L;
+ }
+
/**
* Close this builder and return the resulting buffer.
* @return The built log buffer
@@ -364,7 +389,7 @@ private int writeDefaultBatchHeader() {
maxTimestamp = this.maxTimestamp;
DefaultRecordBatch.writeHeader(buffer, baseOffset, offsetDelta, size, magic, compressionType, timestampType,
- firstTimestamp, maxTimestamp, producerId, producerEpoch, baseSequence, isTransactional, isControlBatch,
+ firstTimestamp, maxTimestamp, producerId, producerEpoch, baseSequence, isTransactional, isControlBatch, deleteHorizonSet(),
partitionLeaderEpoch, numRecords);
buffer.position(pos);
@@ -411,8 +436,12 @@ private Long appendWithOffset(long offset, boolean isControlRecord, long timesta
if (magic < RecordBatch.MAGIC_VALUE_V2 && headers != null && headers.length > 0)
throw new IllegalArgumentException("Magic v" + magic + " does not support record headers");
- if (firstTimestamp == null)
- firstTimestamp = timestamp;
+ if (firstTimestamp == null) {
+ if (deleteHorizonSet())
+ firstTimestamp = deleteHorizonMs;
+ else
+ firstTimestamp = timestamp;
+ }
if (magic > RecordBatch.MAGIC_VALUE_V1) {
appendDefaultRecord(offset, timestamp, key, value, headers);
diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
index 65a6a95fbe41f..45f1609e3bc29 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
@@ -210,6 +210,18 @@ public interface RecordBatch extends Iterable {
*/
boolean isTransactional();
+ /**
+ * Whether or not the base timestamp has been set to the delete horizon
+ * @return true if it is, false otherwise
+ */
+ boolean deleteHorizonSet();
+
+ /**
+ * Get the delete horizon, returns -1L if the first timestamp is not the delete horizon
+ * @return timestamp of the delete horizon
+ */
+ long deleteHorizonMs();
+
/**
* Get the partition leader epoch of this record batch.
* @return The leader epoch or -1 if it is unknown
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
index b8824d3a8276c..4677e40e388e1 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
@@ -247,6 +247,45 @@ public void testFilterToPreservesPartitionLeaderEpoch() {
}
}
+ /**
+ * This test is used to see if the first timestamp of the batch has been successfully
+ * converted to a delete horizon for the tombstones / transaction markers of the batch.
+ */
+ @Test
+ public void testFirstTimestampToDeleteHorizonConversion() {
+ if (magic >= RecordBatch.MAGIC_VALUE_V2) {
+ ByteBuffer buffer = ByteBuffer.allocate(2048);
+ MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME,
+ 0L, RecordBatch.NO_TIMESTAMP, partitionLeaderEpoch);
+ builder.append(10L, "1".getBytes(), null);
+
+ ByteBuffer filtered = ByteBuffer.allocate(2048);
+ final long deleteHorizon = Integer.MAX_VALUE / 2;
+ builder.build().filterTo(new TopicPartition("random", 0), new MemoryRecords.RecordFilter() {
+ @Override
+ protected boolean shouldRetainRecord(RecordBatch recordBatch, Record record) {
+ return true;
+ }
+
+ @Override
+ protected BatchRetention checkBatchRetention(RecordBatch batch) {
+ return BatchRetention.RETAIN_EMPTY;
+ }
+
+ @Override
+ protected long retrieveDeleteHorizon(RecordBatch batch) {
+ return deleteHorizon; // arbitrary value > 1
+ }
+ }, filtered, Integer.MAX_VALUE, BufferSupplier.NO_CACHING);
+ filtered.flip();
+ MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered);
+
+ List batches = TestUtils.toList(filteredRecords.batches());
+ assertEquals(1, batches.size());
+ assertEquals(deleteHorizon, batches.get(0).deleteHorizonMs());
+ }
+ }
+
@Test
public void testFilterToEmptyBatchRetention() {
if (magic >= RecordBatch.MAGIC_VALUE_V2) {
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 5e04c3cb55c97..9e221ffb19a33 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -221,7 +221,8 @@ class Log(@volatile var dir: File,
val producerIdExpirationCheckIntervalMs: Int,
val topicPartition: TopicPartition,
val producerStateManager: ProducerStateManager,
- logDirFailureChannel: LogDirFailureChannel) extends Logging with KafkaMetricsGroup {
+ logDirFailureChannel: LogDirFailureChannel,
+ @volatile var containsTombstones: Boolean = false) extends Logging with KafkaMetricsGroup {
import kafka.log.Log._
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 219f49716f065..a59dfadb1478c 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -509,11 +509,10 @@ private[log] class Cleaner(val id: Int,
case None => 0L
case Some(seg) => seg.lastModified - cleanable.log.config.deleteRetentionMs
}
-
- doClean(cleanable, deleteHorizonMs)
+ doClean(cleanable, time.milliseconds(), trackedHorizon = deleteHorizonMs)
}
- private[log] def doClean(cleanable: LogToClean, deleteHorizonMs: Long): (Long, CleanerStats) = {
+ private[log] def doClean(cleanable: LogToClean, currentTime: Long, trackedHorizon: Long = -1L): (Long, CleanerStats) = {
info("Beginning cleaning of log %s.".format(cleanable.log.name))
val log = cleanable.log
@@ -531,13 +530,13 @@ private[log] class Cleaner(val id: Int,
val cleanableHorizonMs = log.logSegments(0, cleanable.firstUncleanableOffset).lastOption.map(_.lastModified).getOrElse(0L)
// group the segments and clean the groups
- info("Cleaning log %s (cleaning prior to %s, discarding tombstones prior to %s)...".format(log.name, new Date(cleanableHorizonMs), new Date(deleteHorizonMs)))
+ info("Cleaning log %s (cleaning prior to %s, discarding tombstones prior to %s)...".format(log.name, new Date(cleanableHorizonMs), new Date(trackedHorizon)))
val transactionMetadata = new CleanedTransactionMetadata
val groupedSegments = groupSegmentsBySize(log.logSegments(0, endOffset), log.config.segmentSize,
log.config.maxIndexSize, cleanable.firstUncleanableOffset)
for (group <- groupedSegments)
- cleanSegments(log, group, offsetMap, deleteHorizonMs, stats, transactionMetadata)
+ cleanSegments(log, group, offsetMap, currentTime, stats, transactionMetadata, trackedHorizon = trackedHorizon)
// record buffer utilization
stats.bufferUtilization = offsetMap.utilization
@@ -561,9 +560,10 @@ private[log] class Cleaner(val id: Int,
private[log] def cleanSegments(log: Log,
segments: Seq[LogSegment],
map: OffsetMap,
- deleteHorizonMs: Long,
+ currentTime: Long,
stats: CleanerStats,
- transactionMetadata: CleanedTransactionMetadata): Unit = {
+ transactionMetadata: CleanedTransactionMetadata,
+ trackedHorizon: Long = -1L): Unit = {
// create a new segment with a suffix appended to the name of the log and indexes
val cleaned = LogCleaner.createNewCleanedSegment(log, segments.head.baseOffset)
transactionMetadata.cleanedIndex = Some(cleaned.txnIndex)
@@ -583,14 +583,15 @@ private[log] class Cleaner(val id: Int,
val abortedTransactions = log.collectAbortedTransactions(startOffset, upperBoundOffset)
transactionMetadata.addAbortedTransactions(abortedTransactions)
- val retainDeletesAndTxnMarkers = currentSegment.lastModified > deleteHorizonMs
+ val retainDeletesAndTxnMarkers = currentSegment.lastModified > trackedHorizon
info(s"Cleaning $currentSegment in log ${log.name} into ${cleaned.baseOffset} " +
- s"with deletion horizon $deleteHorizonMs, " +
+ s"with deletion horizon $trackedHorizon, " +
s"${if(retainDeletesAndTxnMarkers) "retaining" else "discarding"} deletes.")
try {
- cleanInto(log.topicPartition, currentSegment.log, cleaned, map, retainDeletesAndTxnMarkers, log.config.maxMessageSize,
- transactionMetadata, lastOffsetOfActiveProducers, stats)
+ val containsTombstones: Boolean = cleanInto(log.topicPartition, currentSegment.log, cleaned, map, retainDeletesAndTxnMarkers, log.config.maxMessageSize,
+ transactionMetadata, lastOffsetOfActiveProducers, stats, log.config.deleteRetentionMs, currentTime = currentTime)
+ log.containsTombstones = containsTombstones
} catch {
case e: LogSegmentOffsetOverflowException =>
// Split the current segment. It's also safest to abort the current cleaning process, so that we retry from
@@ -634,6 +635,7 @@ private[log] class Cleaner(val id: Int,
* @param retainDeletesAndTxnMarkers Should tombstones and markers be retained while cleaning this segment
* @param maxLogMessageSize The maximum message size of the corresponding topic
* @param stats Collector for cleaning statistics
+ * @param tombstoneRetentionMs Defines how long a tombstone should be kept as defined by log configuration
*/
private[log] def cleanInto(topicPartition: TopicPartition,
sourceRecords: FileRecords,
@@ -643,14 +645,33 @@ private[log] class Cleaner(val id: Int,
maxLogMessageSize: Int,
transactionMetadata: CleanedTransactionMetadata,
lastRecordsOfActiveProducers: Map[Long, LastRecord],
- stats: CleanerStats): Unit = {
+ stats: CleanerStats,
+ tombstoneRetentionMs: Long,
+ currentTime: Long = RecordBatch.NO_TIMESTAMP): Boolean = {
+ var containsTombstones: Boolean = false
+
val logCleanerFilter: RecordFilter = new RecordFilter {
var discardBatchRecords: Boolean = _
+ var isControlBatchEmpty: Boolean = _
- override def checkBatchRetention(batch: RecordBatch): BatchRetention = {
+ override def isControlBatchEmpty(batch: RecordBatch) : Boolean = {
// we piggy-back on the tombstone retention logic to delay deletion of transaction markers.
// note that we will never delete a marker until all the records from that transaction are removed.
- discardBatchRecords = shouldDiscardBatch(batch, transactionMetadata, retainTxnMarkers = retainDeletesAndTxnMarkers)
+ val canDiscardBatch = shouldDiscardBatch(batch, transactionMetadata, retainTxnMarkers = retainDeletesAndTxnMarkers)
+ isControlBatchEmpty = canDiscardBatch
+ isControlBatchEmpty
+ }
+
+ override def checkBatchRetention(batch: RecordBatch, newBatchDeleteHorizonMs : Long): BatchRetention = {
+ val canDiscardBatch = isControlBatchEmpty
+
+ if (batch.isControlBatch) {
+ discardBatchRecords = canDiscardBatch &&
+ ((batch.deleteHorizonSet() && batch.deleteHorizonMs() < currentTime) ||
+ newBatchDeleteHorizonMs != RecordBatch.NO_TIMESTAMP && newBatchDeleteHorizonMs < currentTime)
+ } else {
+ discardBatchRecords = canDiscardBatch
+ }
def isBatchLastRecordOfProducer: Boolean = {
// We retain the batch in order to preserve the state of active producers. There are three cases:
@@ -675,12 +696,33 @@ private[log] class Cleaner(val id: Int,
BatchRetention.DELETE_EMPTY
}
- override def shouldRetainRecord(batch: RecordBatch, record: Record): Boolean = {
+ override def checkBatchRetention(batch: RecordBatch): BatchRetention = checkBatchRetention(batch, batch.deleteHorizonMs())
+
+ override def shouldRetainRecord(batch: RecordBatch, record: Record, newDeleteHorizonMs: Long): Boolean = {
+ var isRecordRetained: Boolean = true
if (discardBatchRecords)
// The batch is only retained to preserve producer sequence information; the records can be removed
- false
+ isRecordRetained = false
else
- Cleaner.this.shouldRetainRecord(map, retainDeletesAndTxnMarkers, batch, record, stats)
+ isRecordRetained = Cleaner.this.shouldRetainRecord(map, retainDeletesAndTxnMarkers, batch, record, stats, newDeleteHorizonMs, currentTime = currentTime)
+ if (isRecordRetained && !record.hasValue())
+ containsTombstones = true
+ isRecordRetained
+ }
+
+ override def shouldRetainRecord(batch: RecordBatch, record: Record): Boolean = {
+ shouldRetainRecord(batch, record, RecordBatch.NO_TIMESTAMP)
+ }
+
+ override def retrieveDeleteHorizon(batch: RecordBatch) : Long = {
+ if (batch.deleteHorizonSet())
+ return batch.deleteHorizonMs() // means that we keep the old timestamp stored
+
+ // check that the control batch has been emptied of records
+ // if not, then we do not set a delete horizon until that is true
+ if (batch.isControlBatch() && !isControlBatchEmpty)
+ return -1L
+ return time.milliseconds() + tombstoneRetentionMs;
}
}
@@ -720,6 +762,7 @@ private[log] class Cleaner(val id: Int,
growBuffersOrFail(sourceRecords, position, maxLogMessageSize, records)
}
restoreBuffers()
+ containsTombstones
}
@@ -758,20 +801,19 @@ private[log] class Cleaner(val id: Int,
private def shouldDiscardBatch(batch: RecordBatch,
transactionMetadata: CleanedTransactionMetadata,
retainTxnMarkers: Boolean): Boolean = {
- if (batch.isControlBatch) {
- val canDiscardControlBatch = transactionMetadata.onControlBatchRead(batch)
- canDiscardControlBatch && !retainTxnMarkers
- } else {
- val canDiscardBatch = transactionMetadata.onBatchRead(batch)
- canDiscardBatch
- }
+ if (batch.isControlBatch)
+ transactionMetadata.onControlBatchRead(batch)
+ else
+ transactionMetadata.onBatchRead(batch)
}
private def shouldRetainRecord(map: kafka.log.OffsetMap,
retainDeletes: Boolean,
batch: RecordBatch,
record: Record,
- stats: CleanerStats): Boolean = {
+ stats: CleanerStats,
+ newBatchDeleteHorizonMs: Long,
+ currentTime: Long = -1L): Boolean = {
val pastLatestOffset = record.offset > map.latestOffset
if (pastLatestOffset)
return true
@@ -785,7 +827,15 @@ private[log] class Cleaner(val id: Int,
* 2) The message doesn't has value but it can't be deleted now.
*/
val latestOffsetForKey = record.offset() >= foundOffset
- val isRetainedValue = record.hasValue || retainDeletes
+ val isLatestVersion = batch.magic() >= RecordBatch.MAGIC_VALUE_V2
+ var shouldRetainDeletes = true
+ if (isLatestVersion)
+ shouldRetainDeletes = (batch.deleteHorizonSet() && currentTime < batch.deleteHorizonMs()) ||
+ (!batch.deleteHorizonSet() && currentTime < newBatchDeleteHorizonMs)
+ else
+ shouldRetainDeletes = retainDeletes
+ val isRetainedValue = record.hasValue || shouldRetainDeletes
+
latestOffsetForKey && isRetainedValue
} else {
stats.invalidMessage()
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index a5cfed5c0943e..6edd73530a955 100755
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -180,6 +180,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
case (_, log) => log.config.compact // match logs that are marked as compacted
}.filterNot {
case (topicPartition, log) =>
+
// skip any logs already in-progress and uncleanable partitions
inProgress.contains(topicPartition) || isUncleanablePartition(log, topicPartition)
}.map {
@@ -202,7 +203,34 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
(ltc.needCompactionNow && ltc.cleanableBytes > 0) || ltc.cleanableRatio > ltc.log.config.minCleanableRatio
}
if(cleanableLogs.isEmpty) {
- None
+ // in this case, we are probably in a low throughput situation
+ // therefore, we should take advantage of this fact and remove tombstones if we can
+ val logsContainingTombstones = logs.filter {
+ case (_, log) => log.containsTombstones
+ }.filterNot {
+ case (topicPartition, log) =>
+ // skip any logs already in-progress and uncleanable partitions
+ inProgress.contains(topicPartition) || isUncleanablePartition(log, topicPartition)
+ }.map {
+ case (topicPartition, log) => // create a LogToClean instance for each
+ try {
+ val (firstDirtyOffset, firstUncleanableDirtyOffset) = cleanableOffsets(log, topicPartition, lastClean, now)
+ val compactionDelayMs = maxCompactionDelay(log, firstDirtyOffset, now)
+ preCleanStats.updateMaxCompactionDelay(compactionDelayMs)
+
+ LogToClean(topicPartition, log, firstDirtyOffset, firstUncleanableDirtyOffset, compactionDelayMs > 0)
+ } catch {
+ case e: Throwable => throw new LogCleaningException(log,
+ s"Failed to calculate log cleaning stats for partition $topicPartition", e)
+ }
+ }
+ if (!logsContainingTombstones.isEmpty) {
+ val filthiest = logsContainingTombstones.max
+ inProgress.put(filthiest.topicPartition, LogCleaningInProgress)
+ Some(filthiest)
+ } else {
+ None
+ }
} else {
preCleanStats.recordCleanablePartitions(cleanableLogs.size)
val filthiest = cleanableLogs.max
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
index d148c3f89598b..aa908c4c78734 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
@@ -18,6 +18,7 @@
package kafka.log
import java.io.PrintWriter
+import java.util.Properties
import com.yammer.metrics.Metrics
import com.yammer.metrics.core.{Gauge, MetricName}
@@ -178,6 +179,49 @@ class LogCleanerIntegrationTest extends AbstractLogCleanerIntegrationTest with K
s"but lastCleaned=$lastCleaned2", lastCleaned2 >= secondBlockCleanableSegmentOffset)
}
+ @Test
+ def testTombstoneCleanWithLowThoroughput() : Unit = {
+ val tombstoneRetentionMs = 1000 // this is in milliseconds -> 1 second
+
+ val topicPartitions = Array(new TopicPartition("log-partition", 0))
+ val props = new Properties()
+ props.put(LogConfig.DeleteRetentionMsProp, "1000")
+ cleaner = makeCleaner(partitions = topicPartitions, propertyOverrides = props)
+
+ val log = cleaner.logs.get(topicPartitions(0))
+
+ val T0 = time.milliseconds
+ writeKeyDups(numKeys = 1, numDups = 1, log, CompressionType.NONE, timestamp = T0,
+ startValue = 0, step = 1, isRecordTombstone = true)
+
+ val activeSegAtT0 = log.activeSegment
+
+ // roll the active segment
+ log.roll()
+
+ cleaner.startup()
+ Thread.sleep(100)
+
+ import JavaConverters._
+ var containsTombstones: Boolean = false
+ for (segment <- log.logSegments; record <- segment.log.records.asScala) {
+ containsTombstones = true
+ }
+ assertTrue(containsTombstones)
+ time.sleep(tombstoneRetentionMs + 1)
+
+ val firstBlockCleanableSegmentOffset = activeSegAtT0.baseOffset
+
+ // the first block should get cleaned
+ cleaner.awaitCleaned(new TopicPartition("log-partition", 0),
+ firstBlockCleanableSegmentOffset, maxWaitMs = tombstoneRetentionMs * 3)
+
+ for (segment <- log.logSegments; record <- segment.log.records.asScala) {
+ fail ("The log should not contain record " + record + ", tombstone has expired its lifetime.")
+ }
+ assertFalse(log.containsTombstones)
+ }
+
private def readFromLog(log: Log): Iterable[(Int, Int)] = {
import JavaConverters._
for (segment <- log.logSegments; record <- segment.log.records.asScala) yield {
@@ -187,12 +231,17 @@ class LogCleanerIntegrationTest extends AbstractLogCleanerIntegrationTest with K
}
}
- private def writeKeyDups(numKeys: Int, numDups: Int, log: Log, codec: CompressionType, timestamp: Long, startValue: Int, step: Int): Seq[(Int, Int)] = {
+ private def writeKeyDups(numKeys: Int, numDups: Int, log: Log, codec: CompressionType, timestamp: Long,
+ startValue: Int, step: Int, isRecordTombstone: Boolean = false): Seq[(Int, Int)] = {
var valCounter = startValue
for (_ <- 0 until numDups; key <- 0 until numKeys) yield {
val curValue = valCounter
- log.appendAsLeader(TestUtils.singletonRecords(value = curValue.toString.getBytes, codec = codec,
- key = key.toString.getBytes, timestamp = timestamp), leaderEpoch = 0)
+ if (isRecordTombstone)
+ log.appendAsLeader(TestUtils.singletonRecords(value = null, codec = codec,
+ key = key.toString.getBytes, timestamp = timestamp), leaderEpoch = 0)
+ else
+ log.appendAsLeader(TestUtils.singletonRecords(value = curValue.toString.getBytes, codec = codec,
+ key = key.toString.getBytes, timestamp = timestamp), leaderEpoch = 0)
valCounter += step
(key, curValue)
}
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index bb30287bb12bd..7d03b07fd7010 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -347,7 +347,7 @@ class LogCleanerTest {
log.roll()
// cannot remove the marker in this pass because there are still valid records
- var dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1
+ var dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = Long.MaxValue)._1
assertEquals(List(1, 3, 2), LogTest.keysInLog(log))
assertEquals(List(0, 2, 3, 4, 5), offsetsInLog(log))
@@ -356,17 +356,17 @@ class LogCleanerTest {
log.roll()
// the first cleaning preserves the commit marker (at offset 3) since there were still records for the transaction
- dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1
+ dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = Long.MaxValue)._1
assertEquals(List(2, 1, 3), LogTest.keysInLog(log))
assertEquals(List(3, 4, 5, 6, 7, 8), offsetsInLog(log))
// delete horizon forced to 0 to verify marker is not removed early
- dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), deleteHorizonMs = 0L)._1
+ dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = 0L)._1
assertEquals(List(2, 1, 3), LogTest.keysInLog(log))
assertEquals(List(3, 4, 5, 6, 7, 8), offsetsInLog(log))
// clean again with large delete horizon and verify the marker is removed
- dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1
+ dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = Long.MaxValue)._1
assertEquals(List(2, 1, 3), LogTest.keysInLog(log))
assertEquals(List(4, 5, 6, 7, 8), offsetsInLog(log))
}
@@ -395,11 +395,11 @@ class LogCleanerTest {
log.appendAsLeader(commitMarker(producerId, producerEpoch), leaderEpoch = 0, isFromClient = false)
log.roll()
- cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)
+ cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = Long.MaxValue)
assertEquals(List(2), LogTest.keysInLog(log))
assertEquals(List(1, 3, 4), offsetsInLog(log))
- cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)
+ cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = Long.MaxValue)
assertEquals(List(2), LogTest.keysInLog(log))
assertEquals(List(3, 4), offsetsInLog(log))
}
@@ -434,14 +434,14 @@ class LogCleanerTest {
// first time through the records are removed
// Expected State: [{Producer1: EmptyBatch}, {Producer2: EmptyBatch}, {Producer2: Commit}, {2}, {3}, {Producer1: Commit}]
- var dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1
+ var dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = Long.MaxValue)._1
assertEquals(List(2, 3), LogTest.keysInLog(log))
assertEquals(List(4, 5, 6, 7), offsetsInLog(log))
assertEquals(List(1, 3, 4, 5, 6, 7), lastOffsetsPerBatchInLog(log))
// the empty batch remains if cleaned again because it still holds the last sequence
// Expected State: [{Producer1: EmptyBatch}, {Producer2: EmptyBatch}, {Producer2: Commit}, {2}, {3}, {Producer1: Commit}]
- dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1
+ dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = Long.MaxValue)._1
assertEquals(List(2, 3), LogTest.keysInLog(log))
assertEquals(List(4, 5, 6, 7), offsetsInLog(log))
assertEquals(List(1, 3, 4, 5, 6, 7), lastOffsetsPerBatchInLog(log))
@@ -454,13 +454,13 @@ class LogCleanerTest {
log.roll()
// Expected State: [{Producer1: EmptyBatch}, {Producer2: Commit}, {2}, {3}, {Producer1: Commit}, {Producer2: 1}, {Producer2: Commit}]
- dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1
+ dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = Long.MaxValue)._1
assertEquals(List(2, 3, 1), LogTest.keysInLog(log))
assertEquals(List(4, 5, 6, 7, 8, 9), offsetsInLog(log))
assertEquals(List(1, 4, 5, 6, 7, 8, 9), lastOffsetsPerBatchInLog(log))
// Expected State: [{Producer1: EmptyBatch}, {2}, {3}, {Producer1: Commit}, {Producer2: 1}, {Producer2: Commit}]
- dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1
+ dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = Long.MaxValue)._1
assertEquals(List(2, 3, 1), LogTest.keysInLog(log))
assertEquals(List(5, 6, 7, 8, 9), offsetsInLog(log))
assertEquals(List(1, 5, 6, 7, 8, 9), lastOffsetsPerBatchInLog(log))
@@ -484,14 +484,14 @@ class LogCleanerTest {
// first time through the control batch is retained as an empty batch
// Expected State: [{Producer1: EmptyBatch}], [{2}, {3}]
- var dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1
+ var dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = Long.MaxValue)._1
assertEquals(List(2, 3), LogTest.keysInLog(log))
assertEquals(List(1, 2), offsetsInLog(log))
assertEquals(List(0, 1, 2), lastOffsetsPerBatchInLog(log))
// the empty control batch does not cause an exception when cleaned
// Expected State: [{Producer1: EmptyBatch}], [{2}, {3}]
- dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1
+ dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = Long.MaxValue)._1
assertEquals(List(2, 3), LogTest.keysInLog(log))
assertEquals(List(1, 2), offsetsInLog(log))
assertEquals(List(0, 1, 2), lastOffsetsPerBatchInLog(log))
@@ -515,7 +515,7 @@ class LogCleanerTest {
log.roll()
// Both the record and the marker should remain after cleaning
- cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)
+ cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = Long.MaxValue)
assertEquals(List(0, 1), offsetsInLog(log))
assertEquals(List(0, 1), lastOffsetsPerBatchInLog(log))
}
@@ -540,12 +540,12 @@ class LogCleanerTest {
// Both the batch and the marker should remain after cleaning. The batch is retained
// because it is the last entry for this producerId. The marker is retained because
// there are still batches remaining from this transaction.
- cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)
+ cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = Long.MaxValue)
assertEquals(List(1), offsetsInLog(log))
assertEquals(List(0, 1), lastOffsetsPerBatchInLog(log))
// The empty batch and the marker is still retained after a second cleaning.
- cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)
+ cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = Long.MaxValue)
assertEquals(List(1), offsetsInLog(log))
assertEquals(List(0, 1), lastOffsetsPerBatchInLog(log))
}
@@ -570,12 +570,12 @@ class LogCleanerTest {
log.roll()
// delete horizon set to 0 to verify marker is not removed early
- val dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), deleteHorizonMs = 0L)._1
+ val dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = 0L)._1
assertEquals(List(3), LogTest.keysInLog(log))
assertEquals(List(3, 4, 5), offsetsInLog(log))
// clean again with large delete horizon and verify the marker is removed
- cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)
+ cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = Long.MaxValue)
assertEquals(List(3), LogTest.keysInLog(log))
assertEquals(List(4, 5), offsetsInLog(log))
}
@@ -609,12 +609,12 @@ class LogCleanerTest {
// Both transactional batches will be cleaned. The last one will remain in the log
// as an empty batch in order to preserve the producer sequence number and epoch
- cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)
+ cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = Long.MaxValue)
assertEquals(List(1, 3, 4, 5), offsetsInLog(log))
assertEquals(List(1, 2, 3, 4, 5), lastOffsetsPerBatchInLog(log))
// On the second round of cleaning, the marker from the first transaction should be removed.
- cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)
+ cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = Long.MaxValue)
assertEquals(List(3, 4, 5), offsetsInLog(log))
assertEquals(List(2, 3, 4, 5), lastOffsetsPerBatchInLog(log))
}
@@ -646,14 +646,14 @@ class LogCleanerTest {
assertAbortedTransactionIndexed()
// first time through the records are removed
- var dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1
+ var dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = Long.MaxValue)._1
assertAbortedTransactionIndexed()
assertEquals(List(), LogTest.keysInLog(log))
assertEquals(List(2), offsetsInLog(log)) // abort marker is retained
assertEquals(List(1, 2), lastOffsetsPerBatchInLog(log)) // empty batch is retained
// the empty batch remains if cleaned again because it still holds the last sequence
- dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1
+ dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = Long.MaxValue)._1
assertAbortedTransactionIndexed()
assertEquals(List(), LogTest.keysInLog(log))
assertEquals(List(2), offsetsInLog(log)) // abort marker is still retained
@@ -663,13 +663,13 @@ class LogCleanerTest {
appendProducer(Seq(1))
log.roll()
- dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1
+ dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = Long.MaxValue)._1
assertAbortedTransactionIndexed()
assertEquals(List(1), LogTest.keysInLog(log))
assertEquals(List(2, 3), offsetsInLog(log)) // abort marker is not yet gone because we read the empty batch
assertEquals(List(2, 3), lastOffsetsPerBatchInLog(log)) // but we do not preserve the empty batch
- dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1
+ dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = Long.MaxValue)._1
assertEquals(List(1), LogTest.keysInLog(log))
assertEquals(List(3), offsetsInLog(log)) // abort marker is gone
assertEquals(List(3), lastOffsetsPerBatchInLog(log))