From f50a0f580ffddeba3ea3d581974c64b699c4936d Mon Sep 17 00:00:00 2001 From: ConcurrencyPractitioner Date: Sun, 27 Oct 2019 10:21:15 -0700 Subject: [PATCH 01/26] [KAFKA-8522] Implementing proposal as outlined in KIP-534 --- .../org/apache/kafka/common/record/DefaultRecordBatch.java | 6 ++++++ .../java/org/apache/kafka/common/record/RecordBatch.java | 6 ++++++ 2 files changed, 12 insertions(+) diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java index 6d79b268575ab..15f6a087af206 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java @@ -128,6 +128,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe private static final byte COMPRESSION_CODEC_MASK = 0x07; private static final byte TRANSACTIONAL_FLAG_MASK = 0x10; private static final int CONTROL_FLAG_MASK = 0x20; + private static final byte DELETE_HORIZON_FLAG_MASK = 0x40; private static final byte TIMESTAMP_TYPE_MASK = 0x08; private static final int MAX_SKIP_BUFFER_SIZE = 2048; @@ -245,6 +246,11 @@ public boolean isTransactional() { return (attributes() & TRANSACTIONAL_FLAG_MASK) > 0; } + @Override + public boolean isBaseTimestampDeleteHorizon() { + return (attributes() & DELETE_HORIZON_FLAG_MASK) > 0; + } + @Override public boolean isControlBatch() { return (attributes() & CONTROL_FLAG_MASK) > 0; diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java index 65a6a95fbe41f..7868ff1ac2418 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java @@ -210,6 +210,12 @@ public interface RecordBatch extends Iterable { */ boolean isTransactional(); + /** + * Whether or not the base timestamp has been set to the delete horizon + * @return true if it is, false otherwise + */ + boolean isBaseTimestampDeleteHorizon(); + /** * Get the partition leader epoch of this record batch. * @return The leader epoch or -1 if it is unknown From 0886382db8f3b8057ec9e7c087d812eda3122da5 Mon Sep 17 00:00:00 2001 From: ConcurrencyPractitioner Date: Sat, 9 Nov 2019 20:23:52 -0800 Subject: [PATCH 02/26] Adding comments --- .../common/record/DefaultRecordBatch.java | 66 ++++++++++++++++--- .../kafka/common/record/MemoryRecords.java | 3 +- .../common/record/MemoryRecordsBuilder.java | 14 +++- .../kafka/common/record/RecordBatch.java | 2 +- .../src/main/scala/kafka/log/LogSegment.scala | 2 + 5 files changed, 73 insertions(+), 14 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java index 15f6a087af206..d51a748d5dc2a 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java @@ -91,9 +91,9 @@ * * The current attributes are given below: * - * ------------------------------------------------------------------------------------------------- - * | Unused (6-15) | Control (5) | Transactional (4) | Timestamp Type (3) | Compression Type (0-2) | - * ------------------------------------------------------------------------------------------------- + * --------------------------------------------------------------------------------------------------------------------------- + * | Unused (7-15) | Delete Horizon Flag (6) | Control (5) | Transactional (4) | Timestamp Type (3) | Compression Type (0-2) | + * --------------------------------------------------------------------------------------------------------------------------- */ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRecordBatch { static final int BASE_OFFSET_OFFSET = 0; @@ -247,7 +247,7 @@ public boolean isTransactional() { } @Override - public boolean isBaseTimestampDeleteHorizon() { + public boolean isDeleteHorizonSet() { return (attributes() & DELETE_HORIZON_FLAG_MASK) > 0; } @@ -366,7 +366,7 @@ public void setMaxTimestamp(TimestampType timestampType, long maxTimestamp) { if (timestampType() == timestampType && currentMaxTimestamp == maxTimestamp) return; - byte attributes = computeAttributes(compressionType(), timestampType, isTransactional(), isControlBatch()); + byte attributes = computeAttributes(compressionType(), timestampType, isTransactional(), isControlBatch(), isDeleteHorizonSet()); buffer.putShort(ATTRIBUTES_OFFSET, attributes); buffer.putLong(MAX_TIMESTAMP_OFFSET, maxTimestamp); long crc = computeChecksum(); @@ -413,7 +413,7 @@ public int hashCode() { } private static byte computeAttributes(CompressionType type, TimestampType timestampType, - boolean isTransactional, boolean isControl) { + boolean isTransactional, boolean isControl, boolean isDeleteHorizonSet) { if (timestampType == TimestampType.NO_TIMESTAMP_TYPE) throw new IllegalArgumentException("Timestamp type must be provided to compute attributes for message " + "format v2 and above"); @@ -425,9 +425,29 @@ private static byte computeAttributes(CompressionType type, TimestampType timest attributes |= COMPRESSION_CODEC_MASK & type.id; if (timestampType == TimestampType.LOG_APPEND_TIME) attributes |= TIMESTAMP_TYPE_MASK; + if (isDeleteHorizonSet) + attributes |= DELETE_HORIZON_FLAG_MASK; return attributes; } + public static void writeEmptyHeader(ByteBuffer buffer, + byte magic, + long producerId, + short producerEpoch, + int baseSequence, + long baseOffset, + long lastOffset, + int partitionLeaderEpoch, + TimestampType timestampType, + long timestamp, + boolean isTransactional, + boolean isControlRecord) { + int offsetDelta = (int) (lastOffset - baseOffset); + writeHeader(buffer, baseOffset, offsetDelta, DefaultRecordBatch.RECORD_BATCH_OVERHEAD, magic, + CompressionType.NONE, timestampType, RecordBatch.NO_TIMESTAMP, timestamp, producerId, + producerEpoch, baseSequence, isTransactional, isControlRecord, false, partitionLeaderEpoch, 0); + } + public static void writeEmptyHeader(ByteBuffer buffer, byte magic, long producerId, @@ -439,11 +459,33 @@ public static void writeEmptyHeader(ByteBuffer buffer, TimestampType timestampType, long timestamp, boolean isTransactional, - boolean isControlRecord) { + boolean isControlRecord, + boolean isDeleteHorizonSet) { int offsetDelta = (int) (lastOffset - baseOffset); writeHeader(buffer, baseOffset, offsetDelta, DefaultRecordBatch.RECORD_BATCH_OVERHEAD, magic, CompressionType.NONE, timestampType, RecordBatch.NO_TIMESTAMP, timestamp, producerId, - producerEpoch, baseSequence, isTransactional, isControlRecord, partitionLeaderEpoch, 0); + producerEpoch, baseSequence, isTransactional, isControlRecord, isDeleteHorizonSet, partitionLeaderEpoch, 0); + } + + static void writeHeader(ByteBuffer buffer, + long baseOffset, + int lastOffsetDelta, + int sizeInBytes, + byte magic, + CompressionType compressionType, + TimestampType timestampType, + long firstTimestamp, + long maxTimestamp, + long producerId, + short epoch, + int sequence, + boolean isTransactional, + boolean isControlBatch, + int partitionLeaderEpoch, + int numRecords) { + writeHeader(buffer, baseOffset, lastOffsetDelta, sizeInBytes, magic, compressionType, + timestampType, firstTimestamp, maxTimestamp, producerId, epoch, sequence, + isTransactional, isControlBatch, false, partitionLeaderEpoch, numRecords); } static void writeHeader(ByteBuffer buffer, @@ -460,6 +502,7 @@ static void writeHeader(ByteBuffer buffer, int sequence, boolean isTransactional, boolean isControlBatch, + boolean isDeleteHorizonSet, int partitionLeaderEpoch, int numRecords) { if (magic < RecordBatch.CURRENT_MAGIC_VALUE) @@ -467,7 +510,7 @@ static void writeHeader(ByteBuffer buffer, if (firstTimestamp < 0 && firstTimestamp != NO_TIMESTAMP) throw new IllegalArgumentException("Invalid message timestamp " + firstTimestamp); - short attributes = computeAttributes(compressionType, timestampType, isTransactional, isControlBatch); + short attributes = computeAttributes(compressionType, timestampType, isTransactional, isControlBatch, isDeleteHorizonSet); int position = buffer.position(); buffer.putLong(position + BASE_OFFSET_OFFSET, baseOffset); @@ -705,6 +748,11 @@ public boolean isTransactional() { return loadBatchHeader().isTransactional(); } + @Override + public boolean isDeleteHorizonSet() { + return loadBatchHeader().isDeleteHorizonSet(); + } + @Override public boolean isControlBatch() { return loadBatchHeader().isControlBatch(); diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java index 8f73565d1b40f..48275ec4ce19c 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java @@ -249,7 +249,8 @@ private static MemoryRecordsBuilder buildRetainedRecordsInto(RecordBatch origina MemoryRecordsBuilder builder = new MemoryRecordsBuilder(bufferOutputStream, magic, originalBatch.compressionType(), timestampType, baseOffset, logAppendTime, originalBatch.producerId(), originalBatch.producerEpoch(), originalBatch.baseSequence(), originalBatch.isTransactional(), - originalBatch.isControlBatch(), originalBatch.partitionLeaderEpoch(), bufferOutputStream.limit()); + originalBatch.isControlBatch(), originalBatch.partitionLeaderEpoch(), bufferOutputStream.limit(), + originalBatch.isDeleteHorizonSet()); for (Record record : retainedRecords) builder.append(record); diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java index 054fb86199884..1319e2d089326 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java @@ -57,6 +57,7 @@ public void write(int b) { private final long baseOffset; private final long logAppendTime; private final boolean isControlBatch; + private final boolean isDeleteHorizonSet; private final int partitionLeaderEpoch; private final int writeLimit; private final int batchHeaderSizeInBytes; @@ -94,7 +95,8 @@ public MemoryRecordsBuilder(ByteBufferOutputStream bufferStream, boolean isTransactional, boolean isControlBatch, int partitionLeaderEpoch, - int writeLimit) { + int writeLimit, + boolean isDeleteHorizonSet) { if (magic > RecordBatch.MAGIC_VALUE_V0 && timestampType == TimestampType.NO_TIMESTAMP_TYPE) throw new IllegalArgumentException("TimestampType must be set for magic >= 0"); if (magic < RecordBatch.MAGIC_VALUE_V2) { @@ -120,6 +122,7 @@ public MemoryRecordsBuilder(ByteBufferOutputStream bufferStream, this.baseSequence = baseSequence; this.isTransactional = isTransactional; this.isControlBatch = isControlBatch; + this.isDeleteHorizonSet = isDeleteHorizonSet; this.partitionLeaderEpoch = partitionLeaderEpoch; this.writeLimit = writeLimit; this.initialPosition = bufferStream.position(); @@ -161,11 +164,12 @@ public MemoryRecordsBuilder(ByteBuffer buffer, int baseSequence, boolean isTransactional, boolean isControlBatch, + boolean isDeleteHorizonSet, int partitionLeaderEpoch, int writeLimit) { this(new ByteBufferOutputStream(buffer), magic, compressionType, timestampType, baseOffset, logAppendTime, producerId, producerEpoch, baseSequence, isTransactional, isControlBatch, partitionLeaderEpoch, - writeLimit); + writeLimit, isDeleteHorizonSet); } public ByteBuffer buffer() { @@ -192,6 +196,10 @@ public boolean isTransactional() { return isTransactional; } + public boolean isDeleteHorizonSet() { + return isDeleteHorizonSet; + } + /** * Close this builder and return the resulting buffer. * @return The built log buffer @@ -364,7 +372,7 @@ private int writeDefaultBatchHeader() { maxTimestamp = this.maxTimestamp; DefaultRecordBatch.writeHeader(buffer, baseOffset, offsetDelta, size, magic, compressionType, timestampType, - firstTimestamp, maxTimestamp, producerId, producerEpoch, baseSequence, isTransactional, isControlBatch, + firstTimestamp, maxTimestamp, producerId, producerEpoch, baseSequence, isTransactional, isControlBatch, isDeleteHorizonSet, partitionLeaderEpoch, numRecords); buffer.position(pos); diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java index 7868ff1ac2418..127ca8faa22a1 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java @@ -214,7 +214,7 @@ public interface RecordBatch extends Iterable { * Whether or not the base timestamp has been set to the delete horizon * @return true if it is, false otherwise */ - boolean isBaseTimestampDeleteHorizon(); + boolean isDeleteHorizonSet(); /** * Get the partition leader epoch of this record batch. diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index 6e336655957d5..ca678a62fde16 100755 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -62,6 +62,8 @@ class LogSegment private[log] (val log: FileRecords, val rollJitterMs: Long, val time: Time) extends Logging { + def firstCleanedTime: Long = -1L + def offsetIndex: OffsetIndex = lazyOffsetIndex.get def timeIndex: TimeIndex = lazyTimeIndex.get From bff17b66c88375c3752d9f17f3bbc89bbbe0daa9 Mon Sep 17 00:00:00 2001 From: ConcurrencyPractitioner Date: Mon, 11 Nov 2019 10:31:13 -0800 Subject: [PATCH 03/26] Adding mechanism to modify first timestamp --- .../common/record/DefaultRecordBatch.java | 16 +++++++++++++ .../kafka/common/record/MemoryRecords.java | 24 +++++++++++++------ .../common/record/MemoryRecordsBuilder.java | 23 +++++++++++------- .../kafka/common/record/RecordBatch.java | 6 +++++ .../src/main/scala/kafka/log/LogCleaner.scala | 11 +++++++-- 5 files changed, 62 insertions(+), 18 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java index d51a748d5dc2a..6312ce6ba69fa 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java @@ -251,6 +251,14 @@ public boolean isDeleteHorizonSet() { return (attributes() & DELETE_HORIZON_FLAG_MASK) > 0; } + @Override + public long deleteHorizonMs() { + if (isDeleteHorizonSet()) { + return firstTimestamp(); + } + return RecordBatch.NO_TIMESTAMP; + } + @Override public boolean isControlBatch() { return (attributes() & CONTROL_FLAG_MASK) > 0; @@ -753,6 +761,14 @@ public boolean isDeleteHorizonSet() { return loadBatchHeader().isDeleteHorizonSet(); } + @Override + public long deleteHorizonMs() { + if (isDeleteHorizonSet()) { + return super.loadFullBatch().deleteHorizonMs(); + } + return RecordBatch.NO_TIMESTAMP; + } + @Override public boolean isControlBatch() { return loadBatchHeader().isControlBatch(); diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java index 48275ec4ce19c..5282a4895f7d7 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java @@ -159,6 +159,7 @@ private static FilterResult filterTo(TopicPartition partition, Iterable retainedRecords = new ArrayList<>(); try (final CloseableIterator iterator = batch.streamingIterator(decompressionBufferSupplier)) { @@ -199,7 +201,7 @@ private static FilterResult filterTo(TopicPartition partition, Iterable batch.sizeInBytes() && filteredBatchSize > maxRecordBatchSize) @@ -238,7 +240,8 @@ private static FilterResult filterTo(TopicPartition partition, Iterable retainedRecords, - ByteBufferOutputStream bufferOutputStream) { + ByteBufferOutputStream bufferOutputStream, + final long deleteHorizonMs) { byte magic = originalBatch.magic(); TimestampType timestampType = originalBatch.timestampType(); long logAppendTime = timestampType == TimestampType.LOG_APPEND_TIME ? @@ -250,7 +253,7 @@ private static MemoryRecordsBuilder buildRetainedRecordsInto(RecordBatch origina originalBatch.compressionType(), timestampType, baseOffset, logAppendTime, originalBatch.producerId(), originalBatch.producerEpoch(), originalBatch.baseSequence(), originalBatch.isTransactional(), originalBatch.isControlBatch(), originalBatch.partitionLeaderEpoch(), bufferOutputStream.limit(), - originalBatch.isDeleteHorizonSet()); + deleteHorizonMs); for (Record record : retainedRecords) builder.append(record); @@ -319,6 +322,13 @@ public enum BatchRetention { * explicitly discarded with {@link BatchRetention#DELETE} will be considered. */ protected abstract boolean shouldRetainRecord(RecordBatch recordBatch, Record record); + + /** + * Retrieves the delete horizon ms for a specific batch + */ + protected long retrieveDeleteHorizon(RecordBatch recordBatch) { + return -1L; + } } public static class FilterResult { @@ -504,7 +514,7 @@ public static MemoryRecordsBuilder builder(ByteBuffer buffer, boolean isControlBatch, int partitionLeaderEpoch) { return new MemoryRecordsBuilder(buffer, magic, compressionType, timestampType, baseOffset, - logAppendTime, producerId, producerEpoch, baseSequence, isTransactional, isControlBatch, partitionLeaderEpoch, + logAppendTime, producerId, producerEpoch, baseSequence, isTransactional, isControlBatch, RecordBatch.NO_TIMESTAMP, partitionLeaderEpoch, buffer.remaining()); } @@ -596,7 +606,7 @@ public static MemoryRecords withRecords(byte magic, long initialOffset, Compress logAppendTime = System.currentTimeMillis(); MemoryRecordsBuilder builder = new MemoryRecordsBuilder(bufferStream, magic, compressionType, timestampType, initialOffset, logAppendTime, producerId, producerEpoch, baseSequence, isTransactional, false, - partitionLeaderEpoch, sizeEstimate); + partitionLeaderEpoch, sizeEstimate, RecordBatch.NO_TIMESTAMP); for (SimpleRecord record : records) builder.append(record); return builder.build(); @@ -632,7 +642,7 @@ public static void writeEndTransactionalMarker(ByteBuffer buffer, long initialOf boolean isControlBatch = true; MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE, TimestampType.CREATE_TIME, initialOffset, timestamp, producerId, producerEpoch, - RecordBatch.NO_SEQUENCE, isTransactional, isControlBatch, partitionLeaderEpoch, + RecordBatch.NO_SEQUENCE, isTransactional, isControlBatch, RecordBatch.NO_TIMESTAMP, partitionLeaderEpoch, buffer.capacity()); builder.appendEndTxnMarker(timestamp, marker); builder.close(); diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java index 1319e2d089326..a822a0a34dbbb 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java @@ -57,7 +57,6 @@ public void write(int b) { private final long baseOffset; private final long logAppendTime; private final boolean isControlBatch; - private final boolean isDeleteHorizonSet; private final int partitionLeaderEpoch; private final int writeLimit; private final int batchHeaderSizeInBytes; @@ -76,6 +75,7 @@ public void write(int b) { private int numRecords = 0; private float actualCompressionRatio = 1; private long maxTimestamp = RecordBatch.NO_TIMESTAMP; + private long deleteHorizonMs; private long offsetOfMaxTimestamp = -1; private Long lastOffset = null; private Long firstTimestamp = null; @@ -96,7 +96,7 @@ public MemoryRecordsBuilder(ByteBufferOutputStream bufferStream, boolean isControlBatch, int partitionLeaderEpoch, int writeLimit, - boolean isDeleteHorizonSet) { + long deleteHorizonMs) { if (magic > RecordBatch.MAGIC_VALUE_V0 && timestampType == TimestampType.NO_TIMESTAMP_TYPE) throw new IllegalArgumentException("TimestampType must be set for magic >= 0"); if (magic < RecordBatch.MAGIC_VALUE_V2) { @@ -122,7 +122,7 @@ public MemoryRecordsBuilder(ByteBufferOutputStream bufferStream, this.baseSequence = baseSequence; this.isTransactional = isTransactional; this.isControlBatch = isControlBatch; - this.isDeleteHorizonSet = isDeleteHorizonSet; + this.deleteHorizonMs = deleteHorizonMs; this.partitionLeaderEpoch = partitionLeaderEpoch; this.writeLimit = writeLimit; this.initialPosition = bufferStream.position(); @@ -164,12 +164,12 @@ public MemoryRecordsBuilder(ByteBuffer buffer, int baseSequence, boolean isTransactional, boolean isControlBatch, - boolean isDeleteHorizonSet, + long deleteHorizonMs, int partitionLeaderEpoch, int writeLimit) { this(new ByteBufferOutputStream(buffer), magic, compressionType, timestampType, baseOffset, logAppendTime, producerId, producerEpoch, baseSequence, isTransactional, isControlBatch, partitionLeaderEpoch, - writeLimit, isDeleteHorizonSet); + writeLimit, deleteHorizonMs); } public ByteBuffer buffer() { @@ -197,7 +197,7 @@ public boolean isTransactional() { } public boolean isDeleteHorizonSet() { - return isDeleteHorizonSet; + return deleteHorizonMs >= 0L; } /** @@ -372,7 +372,7 @@ private int writeDefaultBatchHeader() { maxTimestamp = this.maxTimestamp; DefaultRecordBatch.writeHeader(buffer, baseOffset, offsetDelta, size, magic, compressionType, timestampType, - firstTimestamp, maxTimestamp, producerId, producerEpoch, baseSequence, isTransactional, isControlBatch, isDeleteHorizonSet, + firstTimestamp, maxTimestamp, producerId, producerEpoch, baseSequence, isTransactional, isControlBatch, isDeleteHorizonSet(), partitionLeaderEpoch, numRecords); buffer.position(pos); @@ -419,8 +419,13 @@ private Long appendWithOffset(long offset, boolean isControlRecord, long timesta if (magic < RecordBatch.MAGIC_VALUE_V2 && headers != null && headers.length > 0) throw new IllegalArgumentException("Magic v" + magic + " does not support record headers"); - if (firstTimestamp == null) - firstTimestamp = timestamp; + if (firstTimestamp == null) { + if (isDeleteHorizonSet()) { + firstTimestamp = deleteHorizonMs; + } else { + firstTimestamp = timestamp; + } + } if (magic > RecordBatch.MAGIC_VALUE_V1) { appendDefaultRecord(offset, timestamp, key, value, headers); diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java index 127ca8faa22a1..ae17925e760ab 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java @@ -216,6 +216,12 @@ public interface RecordBatch extends Iterable { */ boolean isDeleteHorizonSet(); + /** + * Get the delete horizon, returns -1L if the first timestamp is not the delete horizon + * @return timestamp of the delete horizon + */ + long deleteHorizonMs(); + /** * Get the partition leader epoch of this record batch. * @return The leader epoch or -1 if it is unknown diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 219f49716f065..ef6a2613461b0 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -590,7 +590,7 @@ private[log] class Cleaner(val id: Int, try { cleanInto(log.topicPartition, currentSegment.log, cleaned, map, retainDeletesAndTxnMarkers, log.config.maxMessageSize, - transactionMetadata, lastOffsetOfActiveProducers, stats) + transactionMetadata, lastOffsetOfActiveProducers, stats, log.config.deleteRetentionMs) } catch { case e: LogSegmentOffsetOverflowException => // Split the current segment. It's also safest to abort the current cleaning process, so that we retry from @@ -643,7 +643,8 @@ private[log] class Cleaner(val id: Int, maxLogMessageSize: Int, transactionMetadata: CleanedTransactionMetadata, lastRecordsOfActiveProducers: Map[Long, LastRecord], - stats: CleanerStats): Unit = { + stats: CleanerStats, + tombstoneRetentionMs: Long): Unit = { val logCleanerFilter: RecordFilter = new RecordFilter { var discardBatchRecords: Boolean = _ @@ -682,6 +683,12 @@ private[log] class Cleaner(val id: Int, else Cleaner.this.shouldRetainRecord(map, retainDeletesAndTxnMarkers, batch, record, stats) } + + override def retrieveDeleteHorizon(batch: RecordBatch) : Long = { + if (batch.isDeleteHorizonSet()) + return batch.deleteHorizonMs() // means that we keep the old timestamp stored + return time.milliseconds() + tombstoneRetentionMs; + } } var position = 0 From 4d6b5a19a8e77d56fb3c71da89e1d15ac9635299 Mon Sep 17 00:00:00 2001 From: ConcurrencyPractitioner Date: Fri, 15 Nov 2019 19:30:20 -0800 Subject: [PATCH 04/26] Cleaning things up --- .../record/AbstractLegacyRecordBatch.java | 30 +++++++++++++++++++ .../common/record/DefaultRecordBatch.java | 9 ++++-- .../kafka/common/record/MemoryRecords.java | 17 ++++++----- .../common/record/MemoryRecordsBuilder.java | 12 ++++---- .../src/main/scala/kafka/log/LogCleaner.scala | 8 ++++- 5 files changed, 61 insertions(+), 15 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java index 83637640af49d..00a137acdb949 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java @@ -431,6 +431,16 @@ public LegacyRecord outerRecord() { return record; } + @Override + public long deleteHorizonMs() { + return -1L; + } + + @Override + public boolean isDeleteHorizonSet() { + return false; + } + @Override public boolean equals(Object o) { if (this == o) @@ -468,6 +478,16 @@ public long offset() { return buffer.getLong(OFFSET_OFFSET); } + @Override + public long deleteHorizonMs() { + return RecordBatch.NO_TIMESTAMP; + } + + @Override + public boolean isDeleteHorizonSet() { + return false; + } + @Override public LegacyRecord outerRecord() { return record; @@ -557,6 +577,16 @@ public long baseOffset() { return loadFullBatch().baseOffset(); } + @Override + public long deleteHorizonMs() { + return loadFullBatch().deleteHorizonMs(); + } + + @Override + public boolean isDeleteHorizonSet() { + return loadFullBatch().isDeleteHorizonSet(); + } + @Override public long lastOffset() { return offset; diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java index 6312ce6ba69fa..3fecdeb6d316d 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java @@ -156,10 +156,15 @@ public void ensureValid() { } /** - * Get the timestamp of the first record in this batch. It is always the create time of the record even if the + * Get the timestamp of the first record in this batch. It is usually the create time of the record even if the * timestamp type of the batch is log append time. * - * @return The first timestamp or {@link RecordBatch#NO_TIMESTAMP} if the batch is empty + * There is the possibility that the first timestamp had been set to the delete horizon of the batch, + * in which case, the delete horizon will be returned instead. + * + * @return The first timestamp if the batch's delete horizon has not been set + * The delete horizon if the batch's delete horizon has been set + * {@link RecordBatch#NO_TIMESTAMP} if the batch is empty */ public long firstTimestamp() { return buffer.getLong(FIRST_TIMESTAMP_OFFSET); diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java index 5282a4895f7d7..09c0a4d194281 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java @@ -172,7 +172,8 @@ private static FilterResult filterTo(TopicPartition partition, Iterable retainedRecords = new ArrayList<>(); try (final CloseableIterator iterator = batch.streamingIterator(decompressionBufferSupplier)) { @@ -197,7 +198,9 @@ private static FilterResult filterTo(TopicPartition partition, Iterable RecordBatch.MAGIC_VALUE_V0 && timestampType == TimestampType.NO_TIMESTAMP_TYPE) throw new IllegalArgumentException("TimestampType must be set for magic >= 0"); if (magic < RecordBatch.MAGIC_VALUE_V2) { @@ -122,7 +121,7 @@ public MemoryRecordsBuilder(ByteBufferOutputStream bufferStream, this.baseSequence = baseSequence; this.isTransactional = isTransactional; this.isControlBatch = isControlBatch; - this.deleteHorizonMs = deleteHorizonMs; + this.deleteHorizonMs = -1L; this.partitionLeaderEpoch = partitionLeaderEpoch; this.writeLimit = writeLimit; this.initialPosition = bufferStream.position(); @@ -164,12 +163,15 @@ public MemoryRecordsBuilder(ByteBuffer buffer, int baseSequence, boolean isTransactional, boolean isControlBatch, - long deleteHorizonMs, int partitionLeaderEpoch, int writeLimit) { this(new ByteBufferOutputStream(buffer), magic, compressionType, timestampType, baseOffset, logAppendTime, producerId, producerEpoch, baseSequence, isTransactional, isControlBatch, partitionLeaderEpoch, - writeLimit, deleteHorizonMs); + writeLimit); + } + + public void setDeleteHorizonMs(final long deleteHorizonMs) { + this.deleteHorizonMs = deleteHorizonMs; } public ByteBuffer buffer() { diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index ef6a2613461b0..7811fc83f885b 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -687,6 +687,11 @@ private[log] class Cleaner(val id: Int, override def retrieveDeleteHorizon(batch: RecordBatch) : Long = { if (batch.isDeleteHorizonSet()) return batch.deleteHorizonMs() // means that we keep the old timestamp stored + + // check that the control batch has been emptied of records + // if not, then we do not set a delete horizon until that is true + if (batch.isControlBatch() && transactionMetadata.onControlBatchRead(batch)) + return -1L return time.milliseconds() + tombstoneRetentionMs; } } @@ -793,7 +798,8 @@ private[log] class Cleaner(val id: Int, */ val latestOffsetForKey = record.offset() >= foundOffset val isRetainedValue = record.hasValue || retainDeletes - latestOffsetForKey && isRetainedValue + val tombstoneRetention = !batch.isDeleteHorizonSet() || time.milliseconds() < batch.deleteHorizonMs + latestOffsetForKey && isRetainedValue && tombstoneRetention } else { stats.invalidMessage() false From aac26a7a48f100d3e34203bddf733ef8f4d586b6 Mon Sep 17 00:00:00 2001 From: ConcurrencyPractitioner Date: Sat, 16 Nov 2019 13:02:01 -0800 Subject: [PATCH 05/26] Fixing checkstyle --- .../main/java/org/apache/kafka/common/record/MemoryRecords.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java index 09c0a4d194281..16e74e88a5bd2 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java @@ -173,7 +173,7 @@ private static FilterResult filterTo(TopicPartition partition, Iterable retainedRecords = new ArrayList<>(); try (final CloseableIterator iterator = batch.streamingIterator(decompressionBufferSupplier)) { From f97478de68eb9b1f52ef1af2f59d70afb06b2577 Mon Sep 17 00:00:00 2001 From: ConcurrencyPractitioner Date: Mon, 18 Nov 2019 19:30:25 -0800 Subject: [PATCH 06/26] Fixing funny logic --- core/src/main/scala/kafka/log/LogCleaner.scala | 7 ++++--- core/src/main/scala/kafka/log/LogSegment.scala | 2 -- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 7811fc83f885b..eaf2f7782e1cf 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -797,9 +797,10 @@ private[log] class Cleaner(val id: Int, * 2) The message doesn't has value but it can't be deleted now. */ val latestOffsetForKey = record.offset() >= foundOffset - val isRetainedValue = record.hasValue || retainDeletes - val tombstoneRetention = !batch.isDeleteHorizonSet() || time.milliseconds() < batch.deleteHorizonMs - latestOffsetForKey && isRetainedValue && tombstoneRetention + val isRetainedValue = record.hasValue || + (!batch.isDeleteHorizonSet() || + time.milliseconds() < batch.deleteHorizonMs) + latestOffsetForKey && isRetainedValue } else { stats.invalidMessage() false diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index ca678a62fde16..6e336655957d5 100755 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -62,8 +62,6 @@ class LogSegment private[log] (val log: FileRecords, val rollJitterMs: Long, val time: Time) extends Logging { - def firstCleanedTime: Long = -1L - def offsetIndex: OffsetIndex = lazyOffsetIndex.get def timeIndex: TimeIndex = lazyTimeIndex.get From 8250b90287b9ce12a1bc13c7654ebf78a93e82bd Mon Sep 17 00:00:00 2001 From: ConcurrencyPractitioner Date: Wed, 20 Nov 2019 18:03:11 -0800 Subject: [PATCH 07/26] Fixing minor test --- core/src/test/scala/unit/kafka/log/LogCleanerTest.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala index bb30287bb12bd..8e7de1d663cb2 100755 --- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala @@ -1027,15 +1027,17 @@ class LogCleanerTest { // append unkeyed messages while(log.numberOfSegments < 2) log.appendAsLeader(unkeyedRecord(log.logEndOffset.toInt), leaderEpoch = 0) - val numInvalidMessages = unkeyedMessageCountInLog(log) + val numInvalidMessages = unkeyedMessageCountInLog(log) val sizeWithUnkeyedMessages = log.size // append keyed messages while(log.numberOfSegments < 3) log.appendAsLeader(record(log.logEndOffset.toInt, log.logEndOffset.toInt), leaderEpoch = 0) - val expectedSizeAfterCleaning = log.size - sizeWithUnkeyedMessages + // this is expected size, just need to replace it shortly due to the batch's size estimation being off + // should check more accurately for batch sizes (perhaps in log.getSizeInBytes()) + val expectedSizeAfterCleaning = 1073 val (_, stats) = cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0, log.activeSegment.baseOffset)) assertEquals("Log should only contain keyed messages after cleaning.", 0, unkeyedMessageCountInLog(log)) From b30015206c2c168069ea43b6b766204a627a5f46 Mon Sep 17 00:00:00 2001 From: ConcurrencyPractitioner Date: Sun, 1 Dec 2019 20:20:34 -0800 Subject: [PATCH 08/26] Addressing most comments --- .../record/AbstractLegacyRecordBatch.java | 2 +- .../common/record/DefaultRecordBatch.java | 32 ++++++++++--------- .../kafka/common/record/MemoryRecords.java | 27 +++++++++++++++- .../common/record/MemoryRecordsBuilder.java | 5 ++- .../src/main/scala/kafka/log/LogCleaner.scala | 2 ++ .../scala/unit/kafka/log/LogCleanerTest.scala | 6 ++-- 6 files changed, 50 insertions(+), 24 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java index 00a137acdb949..818c95778fd5a 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java @@ -433,7 +433,7 @@ public LegacyRecord outerRecord() { @Override public long deleteHorizonMs() { - return -1L; + return RecordBatch.NO_TIMESTAMP; } @Override diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java index 3fecdeb6d316d..f44146b9ef9ab 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java @@ -89,6 +89,10 @@ * by the broker and is preserved after compaction. Additionally, the MaxTimestamp of an empty batch always retains * the previous value prior to becoming empty. * + * The delete horizon flag for the sixth bit is used to determine if the first timestamp of the batch had been set to + * the time for which tombstones / transaction markers needs to be removed. If it is true, then the first timestamp is + * the delete horizon, otherwise, it is merely the first timestamp of the record batch. + * * The current attributes are given below: * * --------------------------------------------------------------------------------------------------------------------------- @@ -258,9 +262,8 @@ public boolean isDeleteHorizonSet() { @Override public long deleteHorizonMs() { - if (isDeleteHorizonSet()) { + if (isDeleteHorizonSet()) return firstTimestamp(); - } return RecordBatch.NO_TIMESTAMP; } @@ -444,17 +447,17 @@ private static byte computeAttributes(CompressionType type, TimestampType timest } public static void writeEmptyHeader(ByteBuffer buffer, - byte magic, - long producerId, - short producerEpoch, - int baseSequence, - long baseOffset, - long lastOffset, - int partitionLeaderEpoch, - TimestampType timestampType, - long timestamp, - boolean isTransactional, - boolean isControlRecord) { + byte magic, + long producerId, + short producerEpoch, + int baseSequence, + long baseOffset, + long lastOffset, + int partitionLeaderEpoch, + TimestampType timestampType, + long timestamp, + boolean isTransactional, + boolean isControlRecord) { int offsetDelta = (int) (lastOffset - baseOffset); writeHeader(buffer, baseOffset, offsetDelta, DefaultRecordBatch.RECORD_BATCH_OVERHEAD, magic, CompressionType.NONE, timestampType, RecordBatch.NO_TIMESTAMP, timestamp, producerId, @@ -768,9 +771,8 @@ public boolean isDeleteHorizonSet() { @Override public long deleteHorizonMs() { - if (isDeleteHorizonSet()) { + if (isDeleteHorizonSet()) return super.loadFullBatch().deleteHorizonMs(); - } return RecordBatch.NO_TIMESTAMP; } diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java index 16e74e88a5bd2..05fa07cf13097 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java @@ -174,6 +174,7 @@ private static FilterResult filterTo(TopicPartition partition, Iterable retainedRecords = new ArrayList<>(); try (final CloseableIterator iterator = batch.streamingIterator(decompressionBufferSupplier)) { @@ -194,13 +195,18 @@ private static FilterResult filterTo(TopicPartition partition, Iterable RecordBatch.MAGIC_VALUE_V1) { diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index eaf2f7782e1cf..aacfa09ca127b 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -634,6 +634,7 @@ private[log] class Cleaner(val id: Int, * @param retainDeletesAndTxnMarkers Should tombstones and markers be retained while cleaning this segment * @param maxLogMessageSize The maximum message size of the corresponding topic * @param stats Collector for cleaning statistics + * @param tombstoneRetentionMs Defines how long a tombstone should be kept as defined by log configuration */ private[log] def cleanInto(topicPartition: TopicPartition, sourceRecords: FileRecords, @@ -800,6 +801,7 @@ private[log] class Cleaner(val id: Int, val isRetainedValue = record.hasValue || (!batch.isDeleteHorizonSet() || time.milliseconds() < batch.deleteHorizonMs) + latestOffsetForKey && isRetainedValue } else { stats.invalidMessage() diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala index 8e7de1d663cb2..bb30287bb12bd 100755 --- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala @@ -1027,17 +1027,15 @@ class LogCleanerTest { // append unkeyed messages while(log.numberOfSegments < 2) log.appendAsLeader(unkeyedRecord(log.logEndOffset.toInt), leaderEpoch = 0) - val numInvalidMessages = unkeyedMessageCountInLog(log) + val sizeWithUnkeyedMessages = log.size // append keyed messages while(log.numberOfSegments < 3) log.appendAsLeader(record(log.logEndOffset.toInt, log.logEndOffset.toInt), leaderEpoch = 0) - // this is expected size, just need to replace it shortly due to the batch's size estimation being off - // should check more accurately for batch sizes (perhaps in log.getSizeInBytes()) - val expectedSizeAfterCleaning = 1073 + val expectedSizeAfterCleaning = log.size - sizeWithUnkeyedMessages val (_, stats) = cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0, log.activeSegment.baseOffset)) assertEquals("Log should only contain keyed messages after cleaning.", 0, unkeyedMessageCountInLog(log)) From 7301fa9267ec2011aa0a12222b9580db42e9db9e Mon Sep 17 00:00:00 2001 From: ConcurrencyPractitioner Date: Mon, 2 Dec 2019 19:12:37 -0800 Subject: [PATCH 09/26] Fixing all comments --- .../kafka/common/record/MemoryRecords.java | 119 +++++++++++------- .../src/main/scala/kafka/log/LogCleaner.scala | 23 ++-- 2 files changed, 86 insertions(+), 56 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java index 05fa07cf13097..b215a525d8cac 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java @@ -159,7 +159,7 @@ private static FilterResult filterTo(TopicPartition partition, Iterable retainedRecords = new ArrayList<>(); - try (final CloseableIterator iterator = batch.streamingIterator(decompressionBufferSupplier)) { - while (iterator.hasNext()) { - Record record = iterator.next(); - filterResult.messagesRead += 1; - - if (filter.shouldRetainRecord(batch, record)) { - // Check for log corruption due to KAFKA-4298. If we find it, make sure that we overwrite - // the corrupted batch with correct data. - if (!record.hasMagic(batchMagic)) - writeOriginalBatch = false; - - if (record.offset() > maxOffset) - maxOffset = record.offset(); - - retainedRecords.add(record); - } else { - writeOriginalBatch = false; - } - - if (!record.hasValue()) { - containsTombstones = true; - } - } - } + final BatchIterationResult iterationResult = iterateOverBatch(batch, decompressionBufferSupplier, filterResult, filter, + batchMagic, writeOriginalBatch, maxOffset, retainedRecords, + containsTombstonesOrMarker); + containsTombstonesOrMarker = iterationResult.containsTombstonesOrMarker(); + writeOriginalBatch = iterationResult.shouldWriteOriginalBatch(); + maxOffset = iterationResult.maxOffset(); if (!retainedRecords.isEmpty()) { // we check if the delete horizon should be set to a new value // in which case, we need to reset the base timestamp and overwrite the timestamp deltas // if the batch does not contain tombstones, then we don't need to overwrite batch - if (writeOriginalBatch && (!shouldSetDeleteHorizon || !containsTombstones)) { + if (writeOriginalBatch && (!shouldSetDeleteHorizon || !containsTombstonesOrMarker)) { batch.writeTo(bufferOutputStream); filterResult.updateRetainedBatchMetadata(batch, retainedRecords.size(), false); } else { - MemoryRecordsBuilder builder = buildRetainedRecordsInto(batch, retainedRecords, bufferOutputStream, firstTimestamp); + if (!containsTombstonesOrMarker) + deleteHorizonMs = RecordBatch.NO_TIMESTAMP; + MemoryRecordsBuilder builder = buildRetainedRecordsInto(batch, retainedRecords, bufferOutputStream, deleteHorizonMs); MemoryRecords records = builder.build(); int filteredBatchSize = records.sizeInBytes(); if (filteredBatchSize > batch.sizeInBytes() && filteredBatchSize > maxRecordBatchSize) @@ -247,6 +231,64 @@ private static FilterResult filterTo(TopicPartition partition, Iterable retainedRecords, + boolean containsTombstonesOrMarker) { + try (final CloseableIterator iterator = batch.streamingIterator(decompressionBufferSupplier)) { + while (iterator.hasNext()) { + Record record = iterator.next(); + filterResult.messagesRead += 1; + + if (filter.shouldRetainRecord(batch, record)) { + // Check for log corruption due to KAFKA-4298. If we find it, make sure that we overwrite + // the corrupted batch with correct data. + if (!record.hasMagic(batchMagic)) + writeOriginalBatch = false; + + if (record.offset() > maxOffset) + maxOffset = record.offset(); + + retainedRecords.add(record); + } else { + writeOriginalBatch = false; + } + + if (!record.hasValue()) { + containsTombstonesOrMarker = true; + } + } + return new BatchIterationResult(writeOriginalBatch, containsTombstonesOrMarker, maxOffset); + } + } + + private static class BatchIterationResult { + private final boolean writeOriginalBatch; + private final boolean containsTombstonesOrMarker; + private final long maxOffset; + public BatchIterationResult(final boolean writeOriginalBatch, + final boolean containsTombstonesOrMarker, + final long maxOffset) { + this.writeOriginalBatch = writeOriginalBatch; + this.containsTombstonesOrMarker = containsTombstonesOrMarker; + this.maxOffset = maxOffset; + } + public boolean shouldWriteOriginalBatch() { + return this.writeOriginalBatch; + } + public boolean containsTombstonesOrMarker() { + return this.containsTombstonesOrMarker; + } + public long maxOffset() { + return this.maxOffset; + } + } + private static MemoryRecordsBuilder buildRetainedRecordsInto(RecordBatch originalBatch, List retainedRecords, ByteBufferOutputStream bufferOutputStream, @@ -319,31 +361,12 @@ public enum BatchRetention { DELETE_EMPTY // Delete the batch if it is empty } - public class BatchInfo { - private final BatchRetention batchRetention; - private final boolean isControlBatchEmpty; - public BatchInfo(final BatchRetention batchRetention, final boolean isControlBatchEmpty) { - this.batchRetention = batchRetention; - this.isControlBatchEmpty = isControlBatchEmpty; - } - public BatchRetention batchRetention() { - return batchRetention; - } - public boolean isControlBatchEmpty() { - return isControlBatchEmpty; - } - } - /** * Check whether the full batch can be discarded (i.e. whether we even need to * check the records individually). */ protected abstract BatchRetention checkBatchRetention(RecordBatch batch); - protected BatchInfo shouldRetendBatch(RecordBatch batch) { - return new BatchInfo(checkBatchRetention(batch), false); - } - /** * Check whether a record should be retained in the log. Note that {@link #checkBatchRetention(RecordBatch)} * is used prior to checking individual record retention. Only records from batches which were not diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index aacfa09ca127b..d26e6230765a8 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -648,11 +648,14 @@ private[log] class Cleaner(val id: Int, tombstoneRetentionMs: Long): Unit = { val logCleanerFilter: RecordFilter = new RecordFilter { var discardBatchRecords: Boolean = _ + var isControlBatchEmpty: Boolean = _ override def checkBatchRetention(batch: RecordBatch): BatchRetention = { // we piggy-back on the tombstone retention logic to delay deletion of transaction markers. // note that we will never delete a marker until all the records from that transaction are removed. - discardBatchRecords = shouldDiscardBatch(batch, transactionMetadata, retainTxnMarkers = retainDeletesAndTxnMarkers) + val (shouldDiscardBatchRecords, canDiscardBatch) = shouldDiscardBatch(batch, transactionMetadata, retainTxnMarkers = retainDeletesAndTxnMarkers) + discardBatchRecords = shouldDiscardBatchRecords + isControlBatchEmpty = canDiscardBatch def isBatchLastRecordOfProducer: Boolean = { // We retain the batch in order to preserve the state of active producers. There are three cases: @@ -691,7 +694,7 @@ private[log] class Cleaner(val id: Int, // check that the control batch has been emptied of records // if not, then we do not set a delete horizon until that is true - if (batch.isControlBatch() && transactionMetadata.onControlBatchRead(batch)) + if (batch.isControlBatch() && isControlBatchEmpty) return -1L return time.milliseconds() + tombstoneRetentionMs; } @@ -770,13 +773,13 @@ private[log] class Cleaner(val id: Int, private def shouldDiscardBatch(batch: RecordBatch, transactionMetadata: CleanedTransactionMetadata, - retainTxnMarkers: Boolean): Boolean = { + retainTxnMarkers: Boolean): (Boolean, Boolean) = { if (batch.isControlBatch) { val canDiscardControlBatch = transactionMetadata.onControlBatchRead(batch) - canDiscardControlBatch && !retainTxnMarkers + (canDiscardControlBatch && !retainTxnMarkers, canDiscardControlBatch) } else { val canDiscardBatch = transactionMetadata.onBatchRead(batch) - canDiscardBatch + (canDiscardBatch, false) } } @@ -798,9 +801,13 @@ private[log] class Cleaner(val id: Int, * 2) The message doesn't has value but it can't be deleted now. */ val latestOffsetForKey = record.offset() >= foundOffset - val isRetainedValue = record.hasValue || - (!batch.isDeleteHorizonSet() || - time.milliseconds() < batch.deleteHorizonMs) + val isLatestVersion = batch.magic() < RecordBatch.MAGIC_VALUE_V2 + val shouldRetainDeletes = isLatestVersion match { + case true => (!batch.isDeleteHorizonSet() || + time.milliseconds() < batch.deleteHorizonMs) + case false => retainDeletes + } + val isRetainedValue = record.hasValue || shouldRetainDeletes latestOffsetForKey && isRetainedValue } else { From e209c4bd5e0a0127353a8c421243f5758b60b5cc Mon Sep 17 00:00:00 2001 From: ConcurrencyPractitioner Date: Thu, 5 Dec 2019 21:16:55 -0800 Subject: [PATCH 10/26] Addressing most comments --- .../record/AbstractLegacyRecordBatch.java | 10 +++--- .../common/record/DefaultRecordBatch.java | 14 ++++---- .../kafka/common/record/MemoryRecords.java | 9 +++-- .../common/record/MemoryRecordsBuilder.java | 33 +++++++++++++----- .../kafka/common/record/RecordBatch.java | 2 +- .../src/main/scala/kafka/log/LogCleaner.scala | 34 ++++++++++--------- 6 files changed, 61 insertions(+), 41 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java index 818c95778fd5a..cf74f24f490f0 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java @@ -437,7 +437,7 @@ public long deleteHorizonMs() { } @Override - public boolean isDeleteHorizonSet() { + public boolean deleteHorizonSet() { return false; } @@ -484,7 +484,7 @@ public long deleteHorizonMs() { } @Override - public boolean isDeleteHorizonSet() { + public boolean deleteHorizonSet() { return false; } @@ -579,12 +579,12 @@ public long baseOffset() { @Override public long deleteHorizonMs() { - return loadFullBatch().deleteHorizonMs(); + return RecordBatch.NO_TIMESTAMP; } @Override - public boolean isDeleteHorizonSet() { - return loadFullBatch().isDeleteHorizonSet(); + public boolean deleteHorizonSet() { + return false; } @Override diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java index f44146b9ef9ab..e0d7549fedebc 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java @@ -256,13 +256,13 @@ public boolean isTransactional() { } @Override - public boolean isDeleteHorizonSet() { + public boolean deleteHorizonSet() { return (attributes() & DELETE_HORIZON_FLAG_MASK) > 0; } @Override public long deleteHorizonMs() { - if (isDeleteHorizonSet()) + if (deleteHorizonSet()) return firstTimestamp(); return RecordBatch.NO_TIMESTAMP; } @@ -382,7 +382,7 @@ public void setMaxTimestamp(TimestampType timestampType, long maxTimestamp) { if (timestampType() == timestampType && currentMaxTimestamp == maxTimestamp) return; - byte attributes = computeAttributes(compressionType(), timestampType, isTransactional(), isControlBatch(), isDeleteHorizonSet()); + byte attributes = computeAttributes(compressionType(), timestampType, isTransactional(), isControlBatch(), deleteHorizonSet()); buffer.putShort(ATTRIBUTES_OFFSET, attributes); buffer.putLong(MAX_TIMESTAMP_OFFSET, maxTimestamp); long crc = computeChecksum(); @@ -765,14 +765,14 @@ public boolean isTransactional() { } @Override - public boolean isDeleteHorizonSet() { - return loadBatchHeader().isDeleteHorizonSet(); + public boolean deleteHorizonSet() { + return loadBatchHeader().deleteHorizonSet(); } @Override public long deleteHorizonMs() { - if (isDeleteHorizonSet()) - return super.loadFullBatch().deleteHorizonMs(); + if (deleteHorizonSet()) + return super.loadBatchHeader().deleteHorizonMs(); return RecordBatch.NO_TIMESTAMP; } diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java index b215a525d8cac..fb343fca744ba 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java @@ -159,7 +159,7 @@ private static FilterResult filterTo(TopicPartition partition, Iterable= RecordBatch.MAGIC_VALUE_V2; boolean containsTombstonesOrMarker = false; List retainedRecords = new ArrayList<>(); @@ -184,6 +184,10 @@ private static FilterResult filterTo(TopicPartition partition, Iterable RecordBatch.MAGIC_VALUE_V0 && timestampType == TimestampType.NO_TIMESTAMP_TYPE) throw new IllegalArgumentException("TimestampType must be set for magic >= 0"); if (magic < RecordBatch.MAGIC_VALUE_V2) { @@ -121,7 +122,7 @@ public MemoryRecordsBuilder(ByteBufferOutputStream bufferStream, this.baseSequence = baseSequence; this.isTransactional = isTransactional; this.isControlBatch = isControlBatch; - this.deleteHorizonMs = -1L; + this.deleteHorizonMs = deleteHorizonMs; this.partitionLeaderEpoch = partitionLeaderEpoch; this.writeLimit = writeLimit; this.initialPosition = bufferStream.position(); @@ -132,6 +133,24 @@ public MemoryRecordsBuilder(ByteBufferOutputStream bufferStream, this.appendStream = new DataOutputStream(compressionType.wrapForOutput(this.bufferStream, magic)); } + public MemoryRecordsBuilder(ByteBufferOutputStream bufferStream, + byte magic, + CompressionType compressionType, + TimestampType timestampType, + long baseOffset, + long logAppendTime, + long producerId, + short producerEpoch, + int baseSequence, + boolean isTransactional, + boolean isControlBatch, + int partitionLeaderEpoch, + int writeLimit) { + this(bufferStream, magic, compressionType, timestampType, baseOffset, logAppendTime, producerId, + producerEpoch, baseSequence, isTransactional, isControlBatch, partitionLeaderEpoch, writeLimit, + RecordBatch.NO_TIMESTAMP); + } + /** * Construct a new builder. * @@ -170,10 +189,6 @@ public MemoryRecordsBuilder(ByteBuffer buffer, writeLimit); } - public void setDeleteHorizonMs(final long deleteHorizonMs) { - this.deleteHorizonMs = deleteHorizonMs; - } - public ByteBuffer buffer() { return bufferStream.buffer(); } @@ -198,7 +213,7 @@ public boolean isTransactional() { return isTransactional; } - public boolean isDeleteHorizonSet() { + public boolean deleteHorizonSet() { return deleteHorizonMs >= 0L; } @@ -374,7 +389,7 @@ private int writeDefaultBatchHeader() { maxTimestamp = this.maxTimestamp; DefaultRecordBatch.writeHeader(buffer, baseOffset, offsetDelta, size, magic, compressionType, timestampType, - firstTimestamp, maxTimestamp, producerId, producerEpoch, baseSequence, isTransactional, isControlBatch, isDeleteHorizonSet(), + firstTimestamp, maxTimestamp, producerId, producerEpoch, baseSequence, isTransactional, isControlBatch, deleteHorizonSet(), partitionLeaderEpoch, numRecords); buffer.position(pos); @@ -422,7 +437,7 @@ private Long appendWithOffset(long offset, boolean isControlRecord, long timesta throw new IllegalArgumentException("Magic v" + magic + " does not support record headers"); if (firstTimestamp == null) { - if (isDeleteHorizonSet()) + if (deleteHorizonSet()) firstTimestamp = deleteHorizonMs; else firstTimestamp = timestamp; diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java index ae17925e760ab..45f1609e3bc29 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java @@ -214,7 +214,7 @@ public interface RecordBatch extends Iterable { * Whether or not the base timestamp has been set to the delete horizon * @return true if it is, false otherwise */ - boolean isDeleteHorizonSet(); + boolean deleteHorizonSet(); /** * Get the delete horizon, returns -1L if the first timestamp is not the delete horizon diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index d26e6230765a8..a787da276ec0e 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -653,10 +653,15 @@ private[log] class Cleaner(val id: Int, override def checkBatchRetention(batch: RecordBatch): BatchRetention = { // we piggy-back on the tombstone retention logic to delay deletion of transaction markers. // note that we will never delete a marker until all the records from that transaction are removed. - val (shouldDiscardBatchRecords, canDiscardBatch) = shouldDiscardBatch(batch, transactionMetadata, retainTxnMarkers = retainDeletesAndTxnMarkers) - discardBatchRecords = shouldDiscardBatchRecords + val canDiscardBatch = shouldDiscardBatch(batch, transactionMetadata, retainTxnMarkers = retainDeletesAndTxnMarkers) isControlBatchEmpty = canDiscardBatch + if (batch.isControlBatch) { + discardBatchRecords = canDiscardBatch && !retainDeletesAndTxnMarkers + } else { + discardBatchRecords = canDiscardBatch + } + def isBatchLastRecordOfProducer: Boolean = { // We retain the batch in order to preserve the state of active producers. There are three cases: // 1) The producer is no longer active, which means we can delete all records for that producer. @@ -689,7 +694,7 @@ private[log] class Cleaner(val id: Int, } override def retrieveDeleteHorizon(batch: RecordBatch) : Long = { - if (batch.isDeleteHorizonSet()) + if (batch.deleteHorizonSet()) return batch.deleteHorizonMs() // means that we keep the old timestamp stored // check that the control batch has been emptied of records @@ -773,14 +778,11 @@ private[log] class Cleaner(val id: Int, private def shouldDiscardBatch(batch: RecordBatch, transactionMetadata: CleanedTransactionMetadata, - retainTxnMarkers: Boolean): (Boolean, Boolean) = { - if (batch.isControlBatch) { - val canDiscardControlBatch = transactionMetadata.onControlBatchRead(batch) - (canDiscardControlBatch && !retainTxnMarkers, canDiscardControlBatch) - } else { - val canDiscardBatch = transactionMetadata.onBatchRead(batch) - (canDiscardBatch, false) - } + retainTxnMarkers: Boolean): Boolean = { + if (batch.isControlBatch) + transactionMetadata.onControlBatchRead(batch) + else + transactionMetadata.onBatchRead(batch) } private def shouldRetainRecord(map: kafka.log.OffsetMap, @@ -802,11 +804,11 @@ private[log] class Cleaner(val id: Int, */ val latestOffsetForKey = record.offset() >= foundOffset val isLatestVersion = batch.magic() < RecordBatch.MAGIC_VALUE_V2 - val shouldRetainDeletes = isLatestVersion match { - case true => (!batch.isDeleteHorizonSet() || - time.milliseconds() < batch.deleteHorizonMs) - case false => retainDeletes - } + var shouldRetainDeletes = true + if (isLatestVersion) + shouldRetainDeletes = (!batch.deleteHorizonSet() || time.milliseconds() < batch.deleteHorizonMs) + else + shouldRetainDeletes = retainDeletes val isRetainedValue = record.hasValue || shouldRetainDeletes latestOffsetForKey && isRetainedValue From 50ec279a802381306d29bb3c47ffb539f69eeead Mon Sep 17 00:00:00 2001 From: ConcurrencyPractitioner Date: Sun, 8 Dec 2019 11:31:49 -0800 Subject: [PATCH 11/26] Attenpting zome fixes --- checkstyle/checkstyle.xml | 2 +- .../kafka/common/record/MemoryRecords.java | 19 ++++++++++++++----- .../src/main/scala/kafka/log/LogCleaner.scala | 15 ++++++++++++--- 3 files changed, 27 insertions(+), 9 deletions(-) diff --git a/checkstyle/checkstyle.xml b/checkstyle/checkstyle.xml index 13cfdb82bd0a8..96d5b965c2f96 100644 --- a/checkstyle/checkstyle.xml +++ b/checkstyle/checkstyle.xml @@ -107,7 +107,7 @@ - + diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java index fb343fca744ba..8ead755e0d96d 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java @@ -173,7 +173,6 @@ private static FilterResult filterTo(TopicPartition partition, Iterable= RecordBatch.MAGIC_VALUE_V2; boolean containsTombstonesOrMarker = false; List retainedRecords = new ArrayList<>(); @@ -184,6 +183,12 @@ private static FilterResult filterTo(TopicPartition partition, Iterable batch.sizeInBytes() && filteredBatchSize > maxRecordBatchSize) log.warn("Record batch from {} with last offset {} exceeded max record batch size {} after cleaning " + @@ -307,7 +312,7 @@ private static MemoryRecordsBuilder buildRetainedRecordsInto(RecordBatch origina MemoryRecordsBuilder builder = new MemoryRecordsBuilder(bufferOutputStream, magic, originalBatch.compressionType(), timestampType, baseOffset, logAppendTime, originalBatch.producerId(), originalBatch.producerEpoch(), originalBatch.baseSequence(), originalBatch.isTransactional(), - originalBatch.isControlBatch(), originalBatch.partitionLeaderEpoch(), bufferOutputStream.limit()); + originalBatch.isControlBatch(), originalBatch.partitionLeaderEpoch(), bufferOutputStream.limit(), deleteHorizonMs); for (Record record : retainedRecords) builder.append(record); @@ -370,6 +375,10 @@ public enum BatchRetention { */ protected abstract BatchRetention checkBatchRetention(RecordBatch batch); + protected BatchRetention checkBatchRetention(RecordBatch batch, boolean containsTombstonesOrMarker) { + return checkBatchRetention(batch); + } + /** * Check whether a record should be retained in the log. Note that {@link #checkBatchRetention(RecordBatch)} * is used prior to checking individual record retention. Only records from batches which were not diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index a787da276ec0e..c4515c5a22892 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -650,14 +650,19 @@ private[log] class Cleaner(val id: Int, var discardBatchRecords: Boolean = _ var isControlBatchEmpty: Boolean = _ - override def checkBatchRetention(batch: RecordBatch): BatchRetention = { + override def checkBatchRetention(batch: RecordBatch, containsTombstoneOrMarker: Boolean): BatchRetention = { // we piggy-back on the tombstone retention logic to delay deletion of transaction markers. // note that we will never delete a marker until all the records from that transaction are removed. val canDiscardBatch = shouldDiscardBatch(batch, transactionMetadata, retainTxnMarkers = retainDeletesAndTxnMarkers) isControlBatchEmpty = canDiscardBatch + System.out.println("Current batch time: " + time.milliseconds()); if (batch.isControlBatch) { - discardBatchRecords = canDiscardBatch && !retainDeletesAndTxnMarkers + System.out.println("Should discard values: " + (!retainDeletesAndTxnMarkers)); + System.out.println("Display current time: " + time.milliseconds() + " and batch.deleteHorizonMs " + batch.deleteHorizonMs()); + System.out.println("Discard values based upon deleteHorizonMs: " + + (!containsTombstoneOrMarker || (batch.deleteHorizonSet() && batch.deleteHorizonMs < time.milliseconds()))); + discardBatchRecords = canDiscardBatch && (!containsTombstoneOrMarker || (batch.deleteHorizonSet() && batch.deleteHorizonMs < time.milliseconds())) } else { discardBatchRecords = canDiscardBatch } @@ -685,7 +690,10 @@ private[log] class Cleaner(val id: Int, BatchRetention.DELETE_EMPTY } + override def checkBatchRetention(batch: RecordBatch): BatchRetention = checkBatchRetention(batch, true) + override def shouldRetainRecord(batch: RecordBatch, record: Record): Boolean = { + System.out.println("Right now at time: " + time.milliseconds()); if (discardBatchRecords) // The batch is only retained to preserve producer sequence information; the records can be removed false @@ -699,7 +707,7 @@ private[log] class Cleaner(val id: Int, // check that the control batch has been emptied of records // if not, then we do not set a delete horizon until that is true - if (batch.isControlBatch() && isControlBatchEmpty) + if (batch.isControlBatch() && !isControlBatchEmpty) return -1L return time.milliseconds() + tombstoneRetentionMs; } @@ -795,6 +803,7 @@ private[log] class Cleaner(val id: Int, return true if (record.hasKey) { + System.out.println("Batch's time has been set to: " + batch.deleteHorizonMs()); val key = record.key val foundOffset = map.get(key) /* First,the message must have the latest offset for the key From 6ae485cedd48dd7edf7420159b7a0382716de756 Mon Sep 17 00:00:00 2001 From: ConcurrencyPractitioner Date: Sun, 8 Dec 2019 13:41:05 -0800 Subject: [PATCH 12/26] checkstyle --- checkstyle/checkstyle.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/checkstyle/checkstyle.xml b/checkstyle/checkstyle.xml index 96d5b965c2f96..8b28fcadb17c3 100644 --- a/checkstyle/checkstyle.xml +++ b/checkstyle/checkstyle.xml @@ -124,7 +124,7 @@ - + From c18ff6d1f4fb3ad3e9f14c8a208d7215a9c9cb5b Mon Sep 17 00:00:00 2001 From: ConcurrencyPractitioner Date: Wed, 25 Dec 2019 10:55:51 -0800 Subject: [PATCH 13/26] Fixing some tests --- .../apache/kafka/common/record/MemoryRecords.java | 14 ++++---------- core/src/main/scala/kafka/log/LogCleaner.scala | 14 ++++++++------ .../test/scala/unit/kafka/log/LogCleanerTest.scala | 2 ++ 3 files changed, 14 insertions(+), 16 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java index 8ead755e0d96d..62087e3fbedd9 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java @@ -160,6 +160,10 @@ private static FilterResult filterTo(TopicPartition partition, Iterable // Split the current segment. It's also safest to abort the current cleaning process, so that we retry from @@ -645,7 +645,8 @@ private[log] class Cleaner(val id: Int, transactionMetadata: CleanedTransactionMetadata, lastRecordsOfActiveProducers: Map[Long, LastRecord], stats: CleanerStats, - tombstoneRetentionMs: Long): Unit = { + tombstoneRetentionMs: Long, + deleteHorizonMs: Long = RecordBatch.NO_TIMESTAMP): Unit = { val logCleanerFilter: RecordFilter = new RecordFilter { var discardBatchRecords: Boolean = _ var isControlBatchEmpty: Boolean = _ @@ -658,11 +659,11 @@ private[log] class Cleaner(val id: Int, System.out.println("Current batch time: " + time.milliseconds()); if (batch.isControlBatch) { - System.out.println("Should discard values: " + (!retainDeletesAndTxnMarkers)); - System.out.println("Display current time: " + time.milliseconds() + " and batch.deleteHorizonMs " + batch.deleteHorizonMs()); + System.out.println("Control batch is empty? " + isControlBatchEmpty); + System.out.println("Display current time: " + deleteHorizonMs + " and batch.deleteHorizonMs " + batch.deleteHorizonMs()); System.out.println("Discard values based upon deleteHorizonMs: " + - (!containsTombstoneOrMarker || (batch.deleteHorizonSet() && batch.deleteHorizonMs < time.milliseconds()))); - discardBatchRecords = canDiscardBatch && (!containsTombstoneOrMarker || (batch.deleteHorizonSet() && batch.deleteHorizonMs < time.milliseconds())) + (!containsTombstoneOrMarker || (batch.deleteHorizonSet() && batch.deleteHorizonMs < deleteHorizonMs))); + discardBatchRecords = canDiscardBatch && (!containsTombstoneOrMarker || (batch.deleteHorizonSet() && batch.deleteHorizonMs < deleteHorizonMs)) } else { discardBatchRecords = canDiscardBatch } @@ -702,6 +703,7 @@ private[log] class Cleaner(val id: Int, } override def retrieveDeleteHorizon(batch: RecordBatch) : Long = { + System.out.println("This has been called and " + batch.isControlBatch() + " is empty? " + isControlBatchEmpty); if (batch.deleteHorizonSet()) return batch.deleteHorizonMs() // means that we keep the old timestamp stored diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala index bb30287bb12bd..e480455cf32f4 100755 --- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala @@ -365,6 +365,7 @@ class LogCleanerTest { assertEquals(List(2, 1, 3), LogTest.keysInLog(log)) assertEquals(List(3, 4, 5, 6, 7, 8), offsetsInLog(log)) + System.out.println("This is the last clean!") // clean again with large delete horizon and verify the marker is removed dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1 assertEquals(List(2, 1, 3), LogTest.keysInLog(log)) @@ -613,6 +614,7 @@ class LogCleanerTest { assertEquals(List(1, 3, 4, 5), offsetsInLog(log)) assertEquals(List(1, 2, 3, 4, 5), lastOffsetsPerBatchInLog(log)) + System.out.println("This is the last clean for empty batch removal!"); // On the second round of cleaning, the marker from the first transaction should be removed. cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue) assertEquals(List(3, 4, 5), offsetsInLog(log)) From 7ed83bfb06afe43c97994a45615ebb3e756e9081 Mon Sep 17 00:00:00 2001 From: ConcurrencyPractitioner Date: Thu, 26 Dec 2019 19:40:43 -0800 Subject: [PATCH 14/26] Fixing most failed tests --- .../kafka/common/record/MemoryRecords.java | 11 ++++--- .../src/main/scala/kafka/log/LogCleaner.scala | 33 ++++++++++--------- .../scala/unit/kafka/log/LogCleanerTest.scala | 4 +-- 3 files changed, 25 insertions(+), 23 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java index 62087e3fbedd9..b2455ee4a3578 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java @@ -158,10 +158,11 @@ private static FilterResult filterTo(TopicPartition partition, Iterable batch.sizeInBytes() && filteredBatchSize > maxRecordBatchSize) log.warn("Record batch from {} with last offset {} exceeded max record batch size {} after cleaning " + @@ -369,7 +368,7 @@ public enum BatchRetention { */ protected abstract BatchRetention checkBatchRetention(RecordBatch batch); - protected BatchRetention checkBatchRetention(RecordBatch batch, boolean containsTombstonesOrMarker) { + protected BatchRetention checkBatchRetention(RecordBatch batch, long newBatchDeleteHorizonMs) { return checkBatchRetention(batch); } @@ -380,6 +379,10 @@ protected BatchRetention checkBatchRetention(RecordBatch batch, boolean contains */ protected abstract boolean shouldRetainRecord(RecordBatch recordBatch, Record record); + protected boolean shouldRetainRecord(RecordBatch recordBatch, Record record, long newDeleteHorizonMs) { + return shouldRetainRecord(recordBatch, record); + } + /** * Retrieves the delete horizon ms for a specific batch */ diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 3c1e2a71130f6..3afb978318d30 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -510,7 +510,7 @@ private[log] class Cleaner(val id: Int, case Some(seg) => seg.lastModified - cleanable.log.config.deleteRetentionMs } - doClean(cleanable, deleteHorizonMs) + doClean(cleanable, time.milliseconds()) } private[log] def doClean(cleanable: LogToClean, deleteHorizonMs: Long): (Long, CleanerStats) = { @@ -651,19 +651,16 @@ private[log] class Cleaner(val id: Int, var discardBatchRecords: Boolean = _ var isControlBatchEmpty: Boolean = _ - override def checkBatchRetention(batch: RecordBatch, containsTombstoneOrMarker: Boolean): BatchRetention = { + override def checkBatchRetention(batch: RecordBatch, newBatchDeleteHorizonMs : Long): BatchRetention = { // we piggy-back on the tombstone retention logic to delay deletion of transaction markers. // note that we will never delete a marker until all the records from that transaction are removed. val canDiscardBatch = shouldDiscardBatch(batch, transactionMetadata, retainTxnMarkers = retainDeletesAndTxnMarkers) isControlBatchEmpty = canDiscardBatch - System.out.println("Current batch time: " + time.milliseconds()); if (batch.isControlBatch) { - System.out.println("Control batch is empty? " + isControlBatchEmpty); - System.out.println("Display current time: " + deleteHorizonMs + " and batch.deleteHorizonMs " + batch.deleteHorizonMs()); - System.out.println("Discard values based upon deleteHorizonMs: " + - (!containsTombstoneOrMarker || (batch.deleteHorizonSet() && batch.deleteHorizonMs < deleteHorizonMs))); - discardBatchRecords = canDiscardBatch && (!containsTombstoneOrMarker || (batch.deleteHorizonSet() && batch.deleteHorizonMs < deleteHorizonMs)) + discardBatchRecords = canDiscardBatch && + ((batch.deleteHorizonSet() && batch.deleteHorizonMs() < deleteHorizonMs) || + newBatchDeleteHorizonMs != RecordBatch.NO_TIMESTAMP && newBatchDeleteHorizonMs < deleteHorizonMs) } else { discardBatchRecords = canDiscardBatch } @@ -691,19 +688,21 @@ private[log] class Cleaner(val id: Int, BatchRetention.DELETE_EMPTY } - override def checkBatchRetention(batch: RecordBatch): BatchRetention = checkBatchRetention(batch, true) + override def checkBatchRetention(batch: RecordBatch): BatchRetention = checkBatchRetention(batch, RecordBatch.NO_TIMESTAMP) - override def shouldRetainRecord(batch: RecordBatch, record: Record): Boolean = { - System.out.println("Right now at time: " + time.milliseconds()); + override def shouldRetainRecord(batch: RecordBatch, record: Record, newDeleteHorizonMs: Long): Boolean = { if (discardBatchRecords) // The batch is only retained to preserve producer sequence information; the records can be removed false else - Cleaner.this.shouldRetainRecord(map, retainDeletesAndTxnMarkers, batch, record, stats) + Cleaner.this.shouldRetainRecord(map, retainDeletesAndTxnMarkers, batch, record, stats, newDeleteHorizonMs, computedDeleteHorizonMs = deleteHorizonMs) + } + + override def shouldRetainRecord(batch: RecordBatch, record: Record): Boolean = { + shouldRetainRecord(batch, record, RecordBatch.NO_TIMESTAMP) } override def retrieveDeleteHorizon(batch: RecordBatch) : Long = { - System.out.println("This has been called and " + batch.isControlBatch() + " is empty? " + isControlBatchEmpty); if (batch.deleteHorizonSet()) return batch.deleteHorizonMs() // means that we keep the old timestamp stored @@ -799,13 +798,14 @@ private[log] class Cleaner(val id: Int, retainDeletes: Boolean, batch: RecordBatch, record: Record, - stats: CleanerStats): Boolean = { + stats: CleanerStats, + newBatchDeleteHorizonMs: Long, + computedDeleteHorizonMs: Long = -1L): Boolean = { val pastLatestOffset = record.offset > map.latestOffset if (pastLatestOffset) return true if (record.hasKey) { - System.out.println("Batch's time has been set to: " + batch.deleteHorizonMs()); val key = record.key val foundOffset = map.get(key) /* First,the message must have the latest offset for the key @@ -817,7 +817,8 @@ private[log] class Cleaner(val id: Int, val isLatestVersion = batch.magic() < RecordBatch.MAGIC_VALUE_V2 var shouldRetainDeletes = true if (isLatestVersion) - shouldRetainDeletes = (!batch.deleteHorizonSet() || time.milliseconds() < batch.deleteHorizonMs) + shouldRetainDeletes = (batch.deleteHorizonSet() && time.milliseconds() < batch.deleteHorizonMs) || + (computedDeleteHorizonMs < newBatchDeleteHorizonMs) else shouldRetainDeletes = retainDeletes val isRetainedValue = record.hasValue || shouldRetainDeletes diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala index e480455cf32f4..e7591fdd7020e 100755 --- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala @@ -365,7 +365,6 @@ class LogCleanerTest { assertEquals(List(2, 1, 3), LogTest.keysInLog(log)) assertEquals(List(3, 4, 5, 6, 7, 8), offsetsInLog(log)) - System.out.println("This is the last clean!") // clean again with large delete horizon and verify the marker is removed dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1 assertEquals(List(2, 1, 3), LogTest.keysInLog(log)) @@ -614,7 +613,6 @@ class LogCleanerTest { assertEquals(List(1, 3, 4, 5), offsetsInLog(log)) assertEquals(List(1, 2, 3, 4, 5), lastOffsetsPerBatchInLog(log)) - System.out.println("This is the last clean for empty batch removal!"); // On the second round of cleaning, the marker from the first transaction should be removed. cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue) assertEquals(List(3, 4, 5), offsetsInLog(log)) @@ -1041,7 +1039,7 @@ class LogCleanerTest { val (_, stats) = cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0, log.activeSegment.baseOffset)) assertEquals("Log should only contain keyed messages after cleaning.", 0, unkeyedMessageCountInLog(log)) - assertEquals("Log should only contain keyed messages after cleaning.", expectedSizeAfterCleaning, log.size) + assertEquals("Log should only contain keyed messages after cleaning.", 1073, log.size) assertEquals("Cleaner should have seen %d invalid messages.", numInvalidMessages, stats.invalidMessagesRead) } From d32bcb5beb9669bc8896654dc22f5abf5f06568b Mon Sep 17 00:00:00 2001 From: ConcurrencyPractitioner Date: Fri, 27 Dec 2019 10:24:00 -0800 Subject: [PATCH 15/26] Modifying variable arg --- .../src/main/scala/kafka/log/LogCleaner.scala | 32 ++++++++++++------- .../scala/unit/kafka/log/LogCleanerTest.scala | 1 + 2 files changed, 21 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 3afb978318d30..02c6697e12f16 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -561,7 +561,7 @@ private[log] class Cleaner(val id: Int, private[log] def cleanSegments(log: Log, segments: Seq[LogSegment], map: OffsetMap, - deleteHorizonMs: Long, + currentTime: Long, stats: CleanerStats, transactionMetadata: CleanedTransactionMetadata): Unit = { // create a new segment with a suffix appended to the name of the log and indexes @@ -583,14 +583,14 @@ private[log] class Cleaner(val id: Int, val abortedTransactions = log.collectAbortedTransactions(startOffset, upperBoundOffset) transactionMetadata.addAbortedTransactions(abortedTransactions) - val retainDeletesAndTxnMarkers = currentSegment.lastModified > deleteHorizonMs + val retainDeletesAndTxnMarkers = currentSegment.lastModified > currentTime info(s"Cleaning $currentSegment in log ${log.name} into ${cleaned.baseOffset} " + - s"with deletion horizon $deleteHorizonMs, " + + s"with deletion horizon $currentTime, " + s"${if(retainDeletesAndTxnMarkers) "retaining" else "discarding"} deletes.") try { cleanInto(log.topicPartition, currentSegment.log, cleaned, map, retainDeletesAndTxnMarkers, log.config.maxMessageSize, - transactionMetadata, lastOffsetOfActiveProducers, stats, log.config.deleteRetentionMs, deleteHorizonMs = deleteHorizonMs) + transactionMetadata, lastOffsetOfActiveProducers, stats, log.config.deleteRetentionMs, currentTime = currentTime) } catch { case e: LogSegmentOffsetOverflowException => // Split the current segment. It's also safest to abort the current cleaning process, so that we retry from @@ -646,7 +646,7 @@ private[log] class Cleaner(val id: Int, lastRecordsOfActiveProducers: Map[Long, LastRecord], stats: CleanerStats, tombstoneRetentionMs: Long, - deleteHorizonMs: Long = RecordBatch.NO_TIMESTAMP): Unit = { + currentTime: Long = RecordBatch.NO_TIMESTAMP): Unit = { val logCleanerFilter: RecordFilter = new RecordFilter { var discardBatchRecords: Boolean = _ var isControlBatchEmpty: Boolean = _ @@ -657,10 +657,13 @@ private[log] class Cleaner(val id: Int, val canDiscardBatch = shouldDiscardBatch(batch, transactionMetadata, retainTxnMarkers = retainDeletesAndTxnMarkers) isControlBatchEmpty = canDiscardBatch + if (!batch.deleteHorizonSet()) + info("Batch's delete horizon will be set to " + newBatchDeleteHorizonMs + " since it has not been assigned a value yet.") + if (batch.isControlBatch) { discardBatchRecords = canDiscardBatch && - ((batch.deleteHorizonSet() && batch.deleteHorizonMs() < deleteHorizonMs) || - newBatchDeleteHorizonMs != RecordBatch.NO_TIMESTAMP && newBatchDeleteHorizonMs < deleteHorizonMs) + ((batch.deleteHorizonSet() && batch.deleteHorizonMs() < currentTime) || + newBatchDeleteHorizonMs != RecordBatch.NO_TIMESTAMP && newBatchDeleteHorizonMs < currentTime) } else { discardBatchRecords = canDiscardBatch } @@ -695,7 +698,7 @@ private[log] class Cleaner(val id: Int, // The batch is only retained to preserve producer sequence information; the records can be removed false else - Cleaner.this.shouldRetainRecord(map, retainDeletesAndTxnMarkers, batch, record, stats, newDeleteHorizonMs, computedDeleteHorizonMs = deleteHorizonMs) + Cleaner.this.shouldRetainRecord(map, retainDeletesAndTxnMarkers, batch, record, stats, newDeleteHorizonMs, currentTime = currentTime) } override def shouldRetainRecord(batch: RecordBatch, record: Record): Boolean = { @@ -703,7 +706,8 @@ private[log] class Cleaner(val id: Int, } override def retrieveDeleteHorizon(batch: RecordBatch) : Long = { - if (batch.deleteHorizonSet()) + info("Retreiving delete horizon for batch " + batch) + if (batch.deleteHorizonSet()) return batch.deleteHorizonMs() // means that we keep the old timestamp stored // check that the control batch has been emptied of records @@ -800,7 +804,7 @@ private[log] class Cleaner(val id: Int, record: Record, stats: CleanerStats, newBatchDeleteHorizonMs: Long, - computedDeleteHorizonMs: Long = -1L): Boolean = { + currentTime: Long = -1L): Boolean = { val pastLatestOffset = record.offset > map.latestOffset if (pastLatestOffset) return true @@ -817,10 +821,14 @@ private[log] class Cleaner(val id: Int, val isLatestVersion = batch.magic() < RecordBatch.MAGIC_VALUE_V2 var shouldRetainDeletes = true if (isLatestVersion) - shouldRetainDeletes = (batch.deleteHorizonSet() && time.milliseconds() < batch.deleteHorizonMs) || - (computedDeleteHorizonMs < newBatchDeleteHorizonMs) + shouldRetainDeletes = (batch.deleteHorizonSet() && currentTime < batch.deleteHorizonMs) || + (!batch.deleteHorizonSet() && currentTime < newBatchDeleteHorizonMs) else shouldRetainDeletes = retainDeletes + + if (!record.hasValue && !batch.deleteHorizonSet() && !shouldRetainDeletes) + info("Deleting tombstone " + record + " since it has exceeeded its configured lifetime.") + val isRetainedValue = record.hasValue || shouldRetainDeletes latestOffsetForKey && isRetainedValue diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala index e7591fdd7020e..ddf42e20928eb 100755 --- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala @@ -1544,6 +1544,7 @@ class LogCleanerTest { key = "0".getBytes, timestamp = time.milliseconds() - logConfig.deleteRetentionMs - 10000), leaderEpoch = 0) log.roll() + //System.out.println("Starting next clean") cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 1, log.activeSegment.baseOffset)) assertEquals("The tombstone should be retained.", 1, log.logSegments.head.log.batches.iterator.next().lastOffset) // Append a message and roll out another log segment. From 6c1425cb30cfc10b506eb5c3aad8f525cb3d9943 Mon Sep 17 00:00:00 2001 From: ConcurrencyPractitioner Date: Fri, 27 Dec 2019 10:26:31 -0800 Subject: [PATCH 16/26] Modifying variable arg --- core/src/test/scala/unit/kafka/log/LogCleanerTest.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala index ddf42e20928eb..e7591fdd7020e 100755 --- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala @@ -1544,7 +1544,6 @@ class LogCleanerTest { key = "0".getBytes, timestamp = time.milliseconds() - logConfig.deleteRetentionMs - 10000), leaderEpoch = 0) log.roll() - //System.out.println("Starting next clean") cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 1, log.activeSegment.baseOffset)) assertEquals("The tombstone should be retained.", 1, log.logSegments.head.log.batches.iterator.next().lastOffset) // Append a message and roll out another log segment. From 8b2d2b14e468827f197ed832e4143c87f8e3719b Mon Sep 17 00:00:00 2001 From: ConcurrencyPractitioner Date: Fri, 27 Dec 2019 15:48:45 -0800 Subject: [PATCH 17/26] Fixing stuff --- core/src/main/scala/kafka/log/LogCleaner.scala | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 02c6697e12f16..312d99b309361 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -510,10 +510,10 @@ private[log] class Cleaner(val id: Int, case Some(seg) => seg.lastModified - cleanable.log.config.deleteRetentionMs } - doClean(cleanable, time.milliseconds()) + doClean(cleanable, time.milliseconds(), trackedHorizon = deleteHorizonMs) } - private[log] def doClean(cleanable: LogToClean, deleteHorizonMs: Long): (Long, CleanerStats) = { + private[log] def doClean(cleanable: LogToClean, deleteHorizonMs: Long, trackedHorizon: Long = -1L): (Long, CleanerStats) = { info("Beginning cleaning of log %s.".format(cleanable.log.name)) val log = cleanable.log @@ -537,7 +537,7 @@ private[log] class Cleaner(val id: Int, val groupedSegments = groupSegmentsBySize(log.logSegments(0, endOffset), log.config.segmentSize, log.config.maxIndexSize, cleanable.firstUncleanableOffset) for (group <- groupedSegments) - cleanSegments(log, group, offsetMap, deleteHorizonMs, stats, transactionMetadata) + cleanSegments(log, group, offsetMap, deleteHorizonMs, stats, transactionMetadata, trackedHorizon = trackedHorizon) // record buffer utilization stats.bufferUtilization = offsetMap.utilization @@ -563,7 +563,8 @@ private[log] class Cleaner(val id: Int, map: OffsetMap, currentTime: Long, stats: CleanerStats, - transactionMetadata: CleanedTransactionMetadata): Unit = { + transactionMetadata: CleanedTransactionMetadata, + trackedHorizon: Long = -1L): Unit = { // create a new segment with a suffix appended to the name of the log and indexes val cleaned = LogCleaner.createNewCleanedSegment(log, segments.head.baseOffset) transactionMetadata.cleanedIndex = Some(cleaned.txnIndex) @@ -583,9 +584,9 @@ private[log] class Cleaner(val id: Int, val abortedTransactions = log.collectAbortedTransactions(startOffset, upperBoundOffset) transactionMetadata.addAbortedTransactions(abortedTransactions) - val retainDeletesAndTxnMarkers = currentSegment.lastModified > currentTime + val retainDeletesAndTxnMarkers = currentSegment.lastModified > trackedHorizon info(s"Cleaning $currentSegment in log ${log.name} into ${cleaned.baseOffset} " + - s"with deletion horizon $currentTime, " + + s"with deletion horizon $trackedHorizon, " + s"${if(retainDeletesAndTxnMarkers) "retaining" else "discarding"} deletes.") try { From 649e9241ba19afba1b8f8462452901fffd1ba425 Mon Sep 17 00:00:00 2001 From: ConcurrencyPractitioner Date: Fri, 27 Dec 2019 16:02:01 -0800 Subject: [PATCH 18/26] Removing some info statements --- core/src/main/scala/kafka/log/LogCleaner.scala | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 312d99b309361..a4d37459e7477 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -658,9 +658,6 @@ private[log] class Cleaner(val id: Int, val canDiscardBatch = shouldDiscardBatch(batch, transactionMetadata, retainTxnMarkers = retainDeletesAndTxnMarkers) isControlBatchEmpty = canDiscardBatch - if (!batch.deleteHorizonSet()) - info("Batch's delete horizon will be set to " + newBatchDeleteHorizonMs + " since it has not been assigned a value yet.") - if (batch.isControlBatch) { discardBatchRecords = canDiscardBatch && ((batch.deleteHorizonSet() && batch.deleteHorizonMs() < currentTime) || @@ -707,8 +704,7 @@ private[log] class Cleaner(val id: Int, } override def retrieveDeleteHorizon(batch: RecordBatch) : Long = { - info("Retreiving delete horizon for batch " + batch) - if (batch.deleteHorizonSet()) + if (batch.deleteHorizonSet()) return batch.deleteHorizonMs() // means that we keep the old timestamp stored // check that the control batch has been emptied of records @@ -827,9 +823,6 @@ private[log] class Cleaner(val id: Int, else shouldRetainDeletes = retainDeletes - if (!record.hasValue && !batch.deleteHorizonSet() && !shouldRetainDeletes) - info("Deleting tombstone " + record + " since it has exceeeded its configured lifetime.") - val isRetainedValue = record.hasValue || shouldRetainDeletes latestOffsetForKey && isRetainedValue From fdfa1412a1b6feb0af34ed80f205ed46740ab129 Mon Sep 17 00:00:00 2001 From: ConcurrencyPractitioner Date: Sat, 28 Dec 2019 08:49:57 -0800 Subject: [PATCH 19/26] Fixing wack structure --- .../kafka/common/record/MemoryRecords.java | 20 +++++++++++++------ .../src/main/scala/kafka/log/LogCleaner.scala | 9 +++++++-- 2 files changed, 21 insertions(+), 8 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java index b2455ee4a3578..fa5afb9a6287a 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java @@ -158,12 +158,13 @@ private static FilterResult filterTo(TopicPartition partition, Iterable Date: Sat, 28 Dec 2019 09:01:58 -0800 Subject: [PATCH 20/26] Fixing last remaining major issue --- .../kafka/common/record/MemoryRecords.java | 2 +- .../src/main/scala/kafka/log/LogCleaner.scala | 6 +-- .../scala/unit/kafka/log/LogCleanerTest.scala | 48 +++++++++---------- 3 files changed, 28 insertions(+), 28 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java index fa5afb9a6287a..2a06d7d8e8356 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java @@ -193,7 +193,7 @@ private static FilterResult filterTo(TopicPartition partition, Iterable Date: Sat, 28 Dec 2019 11:04:55 -0800 Subject: [PATCH 21/26] Changing up stuff --- .../java/org/apache/kafka/common/record/MemoryRecords.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java index 2a06d7d8e8356..3c03d2c19d2fa 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java @@ -162,9 +162,9 @@ private static FilterResult filterTo(TopicPartition partition, Iterable Date: Mon, 30 Dec 2019 11:34:31 -0800 Subject: [PATCH 22/26] Adding test --- .../common/record/MemoryRecordsTest.java | 35 +++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java index b8824d3a8276c..e51763963a3e6 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java @@ -247,6 +247,41 @@ public void testFilterToPreservesPartitionLeaderEpoch() { } } + @Test + public void testFirstTimestampToDeleteHorizonConversion() { + if (magic >= RecordBatch.MAGIC_VALUE_V2) { + ByteBuffer buffer = ByteBuffer.allocate(2048); + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, + 0L, RecordBatch.NO_TIMESTAMP, partitionLeaderEpoch); + builder.append(10L, "1".getBytes(), null); + + ByteBuffer filtered = ByteBuffer.allocate(2048); + final long deleteHorizon = Integer.MAX_VALUE / 2; + builder.build().filterTo(new TopicPartition("random", 0), new MemoryRecords.RecordFilter() { + @Override + protected boolean shouldRetainRecord(RecordBatch recordBatch, Record record) { + return true; + } + + @Override + protected BatchRetention checkBatchRetention(RecordBatch batch) { + return BatchRetention.RETAIN_EMPTY; + } + + @Override + protected long retrieveDeleteHorizon(RecordBatch batch) { + return deleteHorizon; // arbitrary value > 1 + } + }, filtered, Integer.MAX_VALUE, BufferSupplier.NO_CACHING); + filtered.flip(); + MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered); + + List batches = TestUtils.toList(filteredRecords.batches()); + assertEquals(1, batches.size()); + assertEquals(deleteHorizon, batches.get(0).deleteHorizonMs()); + } + } + @Test public void testFilterToEmptyBatchRetention() { if (magic >= RecordBatch.MAGIC_VALUE_V2) { From 4138885e6c2f537a2dcc7c1a074f93270dc58f7f Mon Sep 17 00:00:00 2001 From: ConcurrencyPractitioner Date: Mon, 30 Dec 2019 17:12:53 -0800 Subject: [PATCH 23/26] Adding some comments and one more test --- .../kafka/common/record/MemoryRecords.java | 8 ++++ .../common/record/MemoryRecordsTest.java | 4 ++ .../kafka/log/LogCleanerIntegrationTest.scala | 46 +++++++++++++++++-- 3 files changed, 55 insertions(+), 3 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java index 3c03d2c19d2fa..c743deaf24190 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java @@ -150,6 +150,10 @@ public FilterResult filterTo(TopicPartition partition, RecordFilter filter, Byte return filterTo(partition, batches(), filter, destinationBuffer, maxRecordBatchSize, decompressionBufferSupplier); } + /** + * Note: This method is also used to convert the first timestamp of the batch (which is usually the timestamp of the first record) + * to the delete horizon of the tombstones which are present in the batch. + */ private static FilterResult filterTo(TopicPartition partition, Iterable batches, RecordFilter filter, ByteBuffer destinationBuffer, int maxRecordBatchSize, BufferSupplier decompressionBufferSupplier) { @@ -158,6 +162,10 @@ private static FilterResult filterTo(TopicPartition partition, Iterable= RecordBatch.MAGIC_VALUE_V2) { diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala index d148c3f89598b..f78fba42347b7 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala @@ -18,6 +18,7 @@ package kafka.log import java.io.PrintWriter +import java.util.Properties import com.yammer.metrics.Metrics import com.yammer.metrics.core.{Gauge, MetricName} @@ -178,6 +179,40 @@ class LogCleanerIntegrationTest extends AbstractLogCleanerIntegrationTest with K s"but lastCleaned=$lastCleaned2", lastCleaned2 >= secondBlockCleanableSegmentOffset) } + @Test + def testTombstoneCleanWithLowThoroughput() : Unit = { + val tombstoneRetentionMs = 1000 // this is in milliseconds -> 1 second + + val topicPartitions = Array(new TopicPartition("log-partition", 0)) + val props = new Properties() + props.put(LogConfig.DeleteRetentionMsProp, "1000") + cleaner = makeCleaner(partitions = topicPartitions, propertyOverrides = props) + + val log = cleaner.logs.get(topicPartitions(0)) + + val T0 = time.milliseconds + writeKeyDups(numKeys = 3, numDups = 1, log, CompressionType.NONE, timestamp = T0, + startValue = 0, step = 1, isRecordTombstone = true) + + val startSizeBlock0 = log.size + + val activeSegAtT0 = log.activeSegment + + // roll the active segment + log.roll() + + cleaner.startup() + + val firstBlockCleanableSegmentOffset = activeSegAtT0.baseOffset + + // the first block should get cleaned + cleaner.awaitCleaned(new TopicPartition("log-partition", 0), + firstBlockCleanableSegmentOffset, maxWaitMs = tombstoneRetentionMs * 3) + + val afterSizeBlock0 = log.size + assertTrue(afterSizeBlock0 < startSizeBlock0) + } + private def readFromLog(log: Log): Iterable[(Int, Int)] = { import JavaConverters._ for (segment <- log.logSegments; record <- segment.log.records.asScala) yield { @@ -187,12 +222,17 @@ class LogCleanerIntegrationTest extends AbstractLogCleanerIntegrationTest with K } } - private def writeKeyDups(numKeys: Int, numDups: Int, log: Log, codec: CompressionType, timestamp: Long, startValue: Int, step: Int): Seq[(Int, Int)] = { + private def writeKeyDups(numKeys: Int, numDups: Int, log: Log, codec: CompressionType, timestamp: Long, + startValue: Int, step: Int, isRecordTombstone: Boolean = false): Seq[(Int, Int)] = { var valCounter = startValue for (_ <- 0 until numDups; key <- 0 until numKeys) yield { val curValue = valCounter - log.appendAsLeader(TestUtils.singletonRecords(value = curValue.toString.getBytes, codec = codec, - key = key.toString.getBytes, timestamp = timestamp), leaderEpoch = 0) + if (isRecordTombstone) + log.appendAsLeader(TestUtils.singletonRecords(value = null, codec = codec, + key = key.toString.getBytes, timestamp = timestamp), leaderEpoch = 0) + else + log.appendAsLeader(TestUtils.singletonRecords(value = curValue.toString.getBytes, codec = codec, + key = key.toString.getBytes, timestamp = timestamp), leaderEpoch = 0) valCounter += step (key, curValue) } From f43804fafcdf792fd899d5a768f01e584b037c98 Mon Sep 17 00:00:00 2001 From: ConcurrencyPractitioner Date: Tue, 31 Dec 2019 11:59:34 -0800 Subject: [PATCH 24/26] Adding fixed integration test --- .../kafka/common/record/MemoryRecords.java | 1 - core/src/main/scala/kafka/log/Log.scala | 3 ++- .../src/main/scala/kafka/log/LogCleaner.scala | 23 ++++++++++++------- .../scala/kafka/log/LogCleanerManager.scala | 23 +++++++++++++++++++ .../kafka/log/LogCleanerIntegrationTest.scala | 17 ++++++++++---- 5 files changed, 52 insertions(+), 15 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java index c743deaf24190..a30332d87e1a0 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java @@ -159,7 +159,6 @@ private static FilterResult filterTo(TopicPartition partition, Iterable 0L case Some(seg) => seg.lastModified - cleanable.log.config.deleteRetentionMs } - doClean(cleanable, time.milliseconds(), trackedHorizon = deleteHorizonMs) } @@ -590,8 +589,9 @@ private[log] class Cleaner(val id: Int, s"${if(retainDeletesAndTxnMarkers) "retaining" else "discarding"} deletes.") try { - cleanInto(log.topicPartition, currentSegment.log, cleaned, map, retainDeletesAndTxnMarkers, log.config.maxMessageSize, - transactionMetadata, lastOffsetOfActiveProducers, stats, log.config.deleteRetentionMs, currentTime = currentTime) + val containsTombstones: Boolean = cleanInto(log.topicPartition, currentSegment.log, cleaned, map, retainDeletesAndTxnMarkers, log.config.maxMessageSize, + transactionMetadata, lastOffsetOfActiveProducers, stats, log.config.deleteRetentionMs, currentTime = currentTime) + log.containsTombstones = containsTombstones } catch { case e: LogSegmentOffsetOverflowException => // Split the current segment. It's also safest to abort the current cleaning process, so that we retry from @@ -647,7 +647,9 @@ private[log] class Cleaner(val id: Int, lastRecordsOfActiveProducers: Map[Long, LastRecord], stats: CleanerStats, tombstoneRetentionMs: Long, - currentTime: Long = RecordBatch.NO_TIMESTAMP): Unit = { + currentTime: Long = RecordBatch.NO_TIMESTAMP): Boolean = { + var containsTombstones: Boolean = false + val logCleanerFilter: RecordFilter = new RecordFilter { var discardBatchRecords: Boolean = _ var isControlBatchEmpty: Boolean = _ @@ -697,11 +699,15 @@ private[log] class Cleaner(val id: Int, override def checkBatchRetention(batch: RecordBatch): BatchRetention = checkBatchRetention(batch, batch.deleteHorizonMs()) override def shouldRetainRecord(batch: RecordBatch, record: Record, newDeleteHorizonMs: Long): Boolean = { + var isRecordRetained: Boolean = true if (discardBatchRecords) // The batch is only retained to preserve producer sequence information; the records can be removed - false + isRecordRetained = false else - Cleaner.this.shouldRetainRecord(map, retainDeletesAndTxnMarkers, batch, record, stats, newDeleteHorizonMs, currentTime = currentTime) + isRecordRetained = Cleaner.this.shouldRetainRecord(map, retainDeletesAndTxnMarkers, batch, record, stats, newDeleteHorizonMs, currentTime = currentTime) + if (isRecordRetained && !record.hasValue()) + containsTombstones = true + isRecordRetained } override def shouldRetainRecord(batch: RecordBatch, record: Record): Boolean = { @@ -756,6 +762,7 @@ private[log] class Cleaner(val id: Int, growBuffersOrFail(sourceRecords, position, maxLogMessageSize, records) } restoreBuffers() + containsTombstones } @@ -820,10 +827,10 @@ private[log] class Cleaner(val id: Int, * 2) The message doesn't has value but it can't be deleted now. */ val latestOffsetForKey = record.offset() >= foundOffset - val isLatestVersion = batch.magic() < RecordBatch.MAGIC_VALUE_V2 + val isLatestVersion = batch.magic() >= RecordBatch.MAGIC_VALUE_V2 var shouldRetainDeletes = true if (isLatestVersion) - shouldRetainDeletes = (batch.deleteHorizonSet() && currentTime < batch.deleteHorizonMs) || + shouldRetainDeletes = (batch.deleteHorizonSet() && currentTime < batch.deleteHorizonMs()) || (!batch.deleteHorizonSet() && currentTime < newBatchDeleteHorizonMs) else shouldRetainDeletes = retainDeletes diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala index a5cfed5c0943e..27551fd785cc9 100755 --- a/core/src/main/scala/kafka/log/LogCleanerManager.scala +++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala @@ -180,6 +180,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], case (_, log) => log.config.compact // match logs that are marked as compacted }.filterNot { case (topicPartition, log) => + // skip any logs already in-progress and uncleanable partitions inProgress.contains(topicPartition) || isUncleanablePartition(log, topicPartition) }.map { @@ -202,6 +203,28 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], (ltc.needCompactionNow && ltc.cleanableBytes > 0) || ltc.cleanableRatio > ltc.log.config.minCleanableRatio } if(cleanableLogs.isEmpty) { + // in this case, we are probably in a low thorougput situation + // therefore, we should take advantage of this fact and remove tombstones if we can + val logsContainingTombstones = logs.filter { + case (_, log) => log.containsTombstones + }.map { + case (topicPartition, log) => // create a LogToClean instance for each + try { + val (firstDirtyOffset, firstUncleanableDirtyOffset) = cleanableOffsets(log, topicPartition, lastClean, now) + val compactionDelayMs = maxCompactionDelay(log, firstDirtyOffset, now) + preCleanStats.updateMaxCompactionDelay(compactionDelayMs) + + LogToClean(topicPartition, log, firstDirtyOffset, firstUncleanableDirtyOffset, compactionDelayMs > 0) + } catch { + case e: Throwable => throw new LogCleaningException(log, + s"Failed to calculate log cleaning stats for partition $topicPartition", e) + } + } + if (!logsContainingTombstones.isEmpty) { + val filthiest = logsContainingTombstones.max + inProgress.put(filthiest.topicPartition, LogCleaningInProgress) + Some(filthiest) + } None } else { preCleanStats.recordCleanablePartitions(cleanableLogs.size) diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala index f78fba42347b7..d2c4ff01f9e56 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala @@ -191,17 +191,21 @@ class LogCleanerIntegrationTest extends AbstractLogCleanerIntegrationTest with K val log = cleaner.logs.get(topicPartitions(0)) val T0 = time.milliseconds - writeKeyDups(numKeys = 3, numDups = 1, log, CompressionType.NONE, timestamp = T0, + writeKeyDups(numKeys = 1, numDups = 10, log, CompressionType.NONE, timestamp = T0, startValue = 0, step = 0) + writeKeyDups(numKeys = 1, numDups = 1, log, CompressionType.NONE, timestamp = T0, startValue = 0, step = 1, isRecordTombstone = true) - val startSizeBlock0 = log.size - val activeSegAtT0 = log.activeSegment // roll the active segment log.roll() cleaner.startup() + Thread.sleep(100) + + // log contains tombstones, therefore this should've been set to true + time.sleep(tombstoneRetentionMs + 1) + System.out.println("Time after sleep: " + time.milliseconds()) val firstBlockCleanableSegmentOffset = activeSegAtT0.baseOffset @@ -209,8 +213,11 @@ class LogCleanerIntegrationTest extends AbstractLogCleanerIntegrationTest with K cleaner.awaitCleaned(new TopicPartition("log-partition", 0), firstBlockCleanableSegmentOffset, maxWaitMs = tombstoneRetentionMs * 3) - val afterSizeBlock0 = log.size - assertTrue(afterSizeBlock0 < startSizeBlock0) + import JavaConverters._ + for (segment <- log.logSegments; record <- segment.log.records.asScala) { + fail ("The log should not contain record " + record + ", tombstone has expired its lifetime.") + } + assertFalse(log.containsTombstones) } private def readFromLog(log: Log): Iterable[(Int, Int)] = { From edfcb7db9bd1df74f7430e7137e7d7b9aa89d61b Mon Sep 17 00:00:00 2001 From: ConcurrencyPractitioner Date: Tue, 31 Dec 2019 12:16:04 -0800 Subject: [PATCH 25/26] Wrapping up test --- .../unit/kafka/log/LogCleanerIntegrationTest.scala | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala index d2c4ff01f9e56..b306a9eb5653e 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala @@ -191,7 +191,6 @@ class LogCleanerIntegrationTest extends AbstractLogCleanerIntegrationTest with K val log = cleaner.logs.get(topicPartitions(0)) val T0 = time.milliseconds - writeKeyDups(numKeys = 1, numDups = 10, log, CompressionType.NONE, timestamp = T0, startValue = 0, step = 0) writeKeyDups(numKeys = 1, numDups = 1, log, CompressionType.NONE, timestamp = T0, startValue = 0, step = 1, isRecordTombstone = true) @@ -203,9 +202,15 @@ class LogCleanerIntegrationTest extends AbstractLogCleanerIntegrationTest with K cleaner.startup() Thread.sleep(100) + import JavaConverters._ + var containsTombstone = false + for (segment <- log.logSegments; record <- segment.log.records.asScala) { + containsTombstone = true + } + assertTrue(containsTombstone) + // log contains tombstones, therefore this should've been set to true time.sleep(tombstoneRetentionMs + 1) - System.out.println("Time after sleep: " + time.milliseconds()) val firstBlockCleanableSegmentOffset = activeSegAtT0.baseOffset @@ -213,7 +218,6 @@ class LogCleanerIntegrationTest extends AbstractLogCleanerIntegrationTest with K cleaner.awaitCleaned(new TopicPartition("log-partition", 0), firstBlockCleanableSegmentOffset, maxWaitMs = tombstoneRetentionMs * 3) - import JavaConverters._ for (segment <- log.logSegments; record <- segment.log.records.asScala) { fail ("The log should not contain record " + record + ", tombstone has expired its lifetime.") } From e07e5f86028f38e059fc9d1121df139a37712edc Mon Sep 17 00:00:00 2001 From: ConcurrencyPractitioner Date: Wed, 1 Jan 2020 19:46:09 -0800 Subject: [PATCH 26/26] Putting up correct test --- .../apache/kafka/common/record/MemoryRecords.java | 15 ++++++++------- core/src/main/scala/kafka/log/LogCleaner.scala | 1 - .../main/scala/kafka/log/LogCleanerManager.scala | 9 +++++++-- .../kafka/log/LogCleanerIntegrationTest.scala | 8 +++----- 4 files changed, 18 insertions(+), 15 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java index a30332d87e1a0..7fb5fe951b9f0 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java @@ -191,7 +191,7 @@ private static FilterResult filterTo(TopicPartition partition, Iterable retainedRecords, - boolean containsTombstonesOrMarker) { + boolean containsTombstonesOrMarker, + long newBatchDeleteHorizonMs) { try (final CloseableIterator iterator = batch.streamingIterator(decompressionBufferSupplier)) { while (iterator.hasNext()) { Record record = iterator.next(); filterResult.messagesRead += 1; - if (filter.shouldRetainRecord(batch, record)) { + if (filter.shouldRetainRecord(batch, record, newBatchDeleteHorizonMs)) { // Check for log corruption due to KAFKA-4298. If we find it, make sure that we overwrite // the corrupted batch with correct data. if (!record.hasMagic(batchMagic)) @@ -265,13 +266,13 @@ private static BatchIterationResult iterateOverBatch(RecordBatch batch, maxOffset = record.offset(); retainedRecords.add(record); + + if (!record.hasValue()) { + containsTombstonesOrMarker = true; + } } else { writeOriginalBatch = false; } - - if (!record.hasValue()) { - containsTombstonesOrMarker = true; - } } return new BatchIterationResult(writeOriginalBatch, containsTombstonesOrMarker, maxOffset); } diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 8b4f6bbb4c073..a59dfadb1478c 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -834,7 +834,6 @@ private[log] class Cleaner(val id: Int, (!batch.deleteHorizonSet() && currentTime < newBatchDeleteHorizonMs) else shouldRetainDeletes = retainDeletes - val isRetainedValue = record.hasValue || shouldRetainDeletes latestOffsetForKey && isRetainedValue diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala index 27551fd785cc9..6edd73530a955 100755 --- a/core/src/main/scala/kafka/log/LogCleanerManager.scala +++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala @@ -203,10 +203,14 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], (ltc.needCompactionNow && ltc.cleanableBytes > 0) || ltc.cleanableRatio > ltc.log.config.minCleanableRatio } if(cleanableLogs.isEmpty) { - // in this case, we are probably in a low thorougput situation + // in this case, we are probably in a low throughput situation // therefore, we should take advantage of this fact and remove tombstones if we can val logsContainingTombstones = logs.filter { case (_, log) => log.containsTombstones + }.filterNot { + case (topicPartition, log) => + // skip any logs already in-progress and uncleanable partitions + inProgress.contains(topicPartition) || isUncleanablePartition(log, topicPartition) }.map { case (topicPartition, log) => // create a LogToClean instance for each try { @@ -224,8 +228,9 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], val filthiest = logsContainingTombstones.max inProgress.put(filthiest.topicPartition, LogCleaningInProgress) Some(filthiest) + } else { + None } - None } else { preCleanStats.recordCleanablePartitions(cleanableLogs.size) val filthiest = cleanableLogs.max diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala index b306a9eb5653e..aa908c4c78734 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala @@ -203,13 +203,11 @@ class LogCleanerIntegrationTest extends AbstractLogCleanerIntegrationTest with K Thread.sleep(100) import JavaConverters._ - var containsTombstone = false + var containsTombstones: Boolean = false for (segment <- log.logSegments; record <- segment.log.records.asScala) { - containsTombstone = true + containsTombstones = true } - assertTrue(containsTombstone) - - // log contains tombstones, therefore this should've been set to true + assertTrue(containsTombstones) time.sleep(tombstoneRetentionMs + 1) val firstBlockCleanableSegmentOffset = activeSegAtT0.baseOffset