[KAFKA-8522] Implementing proposal as outlined in KIP-534#7600
[KAFKA-8522] Implementing proposal as outlined in KIP-534#7600ConcurrencyPractitioner wants to merge 26 commits intoapache:trunkfrom ConcurrencyPractitioner:KAFKA-8522
Conversation
|
@junrao @hachikuji Have finished the initial ground work. Will be glad to get some comments. |
|
Retest this please. |
|
@junrao @hachikuji Fixed the issue. Ready for review. |
|
@hachikuji @guozhangwang Have any comments? |
junrao
left a comment
There was a problem hiding this comment.
@ConcurrencyPractitioner : Thanks for the PR. Looks good overall. Left a few comments below.
| lastRecordsOfActiveProducers: Map[Long, LastRecord], | ||
| stats: CleanerStats): Unit = { | ||
| stats: CleanerStats, | ||
| tombstoneRetentionMs: Long): Unit = { |
There was a problem hiding this comment.
Could we add the javadoc for tombstoneRetentionMs?
There was a problem hiding this comment.
Sure.
|
|
||
| @Override | ||
| public long deleteHorizonMs() { | ||
| return -1L; |
There was a problem hiding this comment.
Could we use RecordBatch.NO_TIMESTAMP here too?
| public long deleteHorizonMs() { | ||
| if (isDeleteHorizonSet()) { | ||
| return firstTimestamp(); | ||
| } |
There was a problem hiding this comment.
No need for {} for single line statement.
| } | ||
|
|
||
| public static void writeEmptyHeader(ByteBuffer buffer, | ||
| byte magic, |
There was a problem hiding this comment.
The existing indentation is correct.
| public long deleteHorizonMs() { | ||
| if (isDeleteHorizonSet()) { | ||
| return super.loadFullBatch().deleteHorizonMs(); | ||
| } |
There was a problem hiding this comment.
No need for {} for single line statement.
| if (firstTimestamp == null) { | ||
| if (isDeleteHorizonSet()) { | ||
| firstTimestamp = deleteHorizonMs; | ||
| } else { |
There was a problem hiding this comment.
No need for {} for single line statement.
| val isRetainedValue = record.hasValue || retainDeletes | ||
| val isRetainedValue = record.hasValue || | ||
| (!batch.isDeleteHorizonSet() || | ||
| time.milliseconds() < batch.deleteHorizonMs) |
There was a problem hiding this comment.
We probably should use the new condition only when the record batch is of the latest message format. Otherwise, we could continue to use the old condition.
There was a problem hiding this comment.
Oh, I ran tests, and spotbug main reported dodgy code when I checked if the version was less than the most recent one. Apparently, by this point, we already know that the batch is the latest version (v_2) anyways, so we don't need to check for this.
There was a problem hiding this comment.
Hmm, how do we know at this point that batch is of V2? It seems that it could be any version.
There was a problem hiding this comment.
Alright, will push this to git then. Can't find exactly where it is checked.
|
|
||
| // check that the control batch has been emptied of records | ||
| // if not, then we do not set a delete horizon until that is true | ||
| if (batch.isControlBatch() && transactionMetadata.onControlBatchRead(batch)) |
There was a problem hiding this comment.
Hmm, we already called transactionMetadata.onControlBatchRead(batch) in shouldDiscardBatch(). So, we probably don't want to call it again on the same batch. Instead, we want to pass along the return value to retrieveDeleteHorizon().
There was a problem hiding this comment.
Yep, working on this. Will address this soon.
| if (writeOriginalBatch) { | ||
| // we check if the delete horizon should be set to a new value | ||
| // in which case, we need to reset the base timestamp and overwrite the timestamp deltas | ||
| if (writeOriginalBatch && !shouldSetDeleteHorizon) { |
There was a problem hiding this comment.
We only need to write the DeleteHorizonTime if the batch contains a tombstone or a control record. This increases the chance for more optimized writeOriginalBatch case to happen.
| * ------------------------------------------------------------------------------------------------- | ||
| * | Unused (6-15) | Control (5) | Transactional (4) | Timestamp Type (3) | Compression Type (0-2) | | ||
| * ------------------------------------------------------------------------------------------------- | ||
| * --------------------------------------------------------------------------------------------------------------------------- |
There was a problem hiding this comment.
Could we add some doc on how Delete Horizon flag is being used?
There was a problem hiding this comment.
Done.
| filterResult.updateRetainedBatchMetadata(batch, retainedRecords.size(), false); | ||
| } else { | ||
| MemoryRecordsBuilder builder = buildRetainedRecordsInto(batch, retainedRecords, bufferOutputStream); | ||
| MemoryRecordsBuilder builder = buildRetainedRecordsInto(batch, retainedRecords, bufferOutputStream, firstTimestamp); |
There was a problem hiding this comment.
We probably only want to set the DeleteHorizonTime if the batch contains tombstone.
There was a problem hiding this comment.
Yeah, I missed that. Should've also checked in the else part of the code as well.
| for (MutableRecordBatch batch : batches) { | ||
| long maxOffset = -1L; | ||
| BatchRetention batchRetention = filter.checkBatchRetention(batch); | ||
| final long firstTimestamp = filter.retrieveDeleteHorizon(batch); |
There was a problem hiding this comment.
firstTimestamp => deleteHorizonMs?
| // we want to check if the delete horizon has been set or stayed the same | ||
| boolean writeOriginalBatch = true; | ||
| boolean shouldSetDeleteHorizon = !batch.isDeleteHorizonSet() && firstTimestamp != RecordBatch.NO_TIMESTAMP; | ||
| boolean containsTombstones = false; |
There was a problem hiding this comment.
containsTombstones => containsTombstonesOrMarker?
|
@junrao All comments have been addressed. |
junrao
left a comment
There was a problem hiding this comment.
@ConcurrencyPractitioner : Thanks for the updated patch. A few more comments below. Also, could we add a test for the new logic?
| } | ||
|
|
||
| @Override | ||
| public boolean isDeleteHorizonSet() { |
There was a problem hiding this comment.
isDeleteHorizonSet => deleteHorizonSet ?
|
|
||
| @Override | ||
| public long deleteHorizonMs() { | ||
| return loadFullBatch().deleteHorizonMs(); |
There was a problem hiding this comment.
Since this is a legacy record, deleteHorizonMs is never going to be set. It seems we can avoid loading the full batch? Ditto in isDeleteHorizonSet below.
| @Override | ||
| public long deleteHorizonMs() { | ||
| if (isDeleteHorizonSet()) | ||
| return super.loadFullBatch().deleteHorizonMs(); |
There was a problem hiding this comment.
Hmm, why do we need to call loadFullBatch()? Could we just call loadBatchHeader()?
| for (MutableRecordBatch batch : batches) { | ||
| long maxOffset = -1L; | ||
| BatchRetention batchRetention = filter.checkBatchRetention(batch); | ||
| long deleteHorizonMs = filter.retrieveDeleteHorizon(batch); |
There was a problem hiding this comment.
Hmm, it seems that we can just get deleteHorizonMs from batch directly, instead of through filter.
There was a problem hiding this comment.
Yeah, I get what you are saying. Just want to point out that this method is still necessary when the delete horizon is not set.
| val latestOffsetForKey = record.offset() >= foundOffset | ||
| val isRetainedValue = record.hasValue || retainDeletes | ||
| val isLatestVersion = batch.magic() < RecordBatch.MAGIC_VALUE_V2 | ||
| val shouldRetainDeletes = isLatestVersion match { |
There was a problem hiding this comment.
For boolean value, we can just use if/else instead of match/case.
| } else { | ||
| val canDiscardBatch = transactionMetadata.onBatchRead(batch) | ||
| canDiscardBatch | ||
| (canDiscardBatch, false) |
There was a problem hiding this comment.
If a control batch is empty, we need to check whether the batch should be retained based on deleteHorizonMs similar to what's in shouldRetainRecord. We can ignore retainTxnMarkers since a control batch is always in V2.
Also, it's a bit awkward to have to return 2 values. Maybe it's simpler to just return canDiscardControlBatch and let the caller decide how the batch should be retained.
| byte batchMagic = batch.magic(); | ||
| // we want to check if the delete horizon has been set or stayed the same | ||
| boolean writeOriginalBatch = true; | ||
| boolean shouldSetDeleteHorizon = !batch.isDeleteHorizonSet() && deleteHorizonMs != RecordBatch.NO_TIMESTAMP; |
There was a problem hiding this comment.
Hmm, a more intuitive check is probably !batch.isDeleteHorizonSet() && messageFormat >= V2.
| originalBatch.compressionType(), timestampType, baseOffset, logAppendTime, originalBatch.producerId(), | ||
| originalBatch.producerEpoch(), originalBatch.baseSequence(), originalBatch.isTransactional(), | ||
| originalBatch.isControlBatch(), originalBatch.partitionLeaderEpoch(), bufferOutputStream.limit()); | ||
| builder.setDeleteHorizonMs(deleteHorizonMs); |
There was a problem hiding this comment.
Instead of adding a new setDeleteHorizonMs() method, would it be better to just add another constructor for MemoryRecordsBuilder that includes DeleteHorizonMs? This way, it's more consistent with how DeleteHorizonMs is set in another classes.
There was a problem hiding this comment.
@junrao That wouldn't work. Checkstyle prohibits any constructor which have more than 13 parameters. Adding a new one would make it 14. I think that we should do a builder pattern instead?
There was a problem hiding this comment.
We could probably just bump up the number of parameter limit in the checkstyle file to 14.
|
Print statements are temporary. |
|
Cool, so I fixed the majority of the tests. |
|
Retest this please. |
|
@junrao Feel free to leave some comments. I fixed all the failing tests, just need to add some new ones. Will do so today. |
|
Retest this please. |
|
Migrating to #7884 . |
Trying to resolve long running issues we have with transactional marker and tombstone retention.