KAFKA-6834: Handle compaction with batches bigger than max.message.bytes#4953
Conversation
ijuma
left a comment
There was a problem hiding this comment.
Thanks for the PR, left a few comments.
There was a problem hiding this comment.
This should be CorruptRecordException too.
There was a problem hiding this comment.
We only ever invoke this method from nextBatch() which has already done this check, or when growing log cleaner buffers beyond max.message.size. So we don't expect this check to fail, hence IllegalStateException seems better?
There was a problem hiding this comment.
We also invoke it from MemoryRecords, no?
There was a problem hiding this comment.
We invoke it from MemoryRecords#nextBatchSize() for use in LogCleaner since this class is package-private.
There was a problem hiding this comment.
Right, so it seems a bit brittle to assume that some checks have already been done.
There was a problem hiding this comment.
Since we have the check in nextBatch already, would it make sense to grab the record size there and pass it to this function as an argument?
There was a problem hiding this comment.
I changed the method to return null if there isn't enough data in the buffer, making it consistent with nextBatch. Added comments and test.
There was a problem hiding this comment.
What if the max message size is reduced? Is this check too strict?
There was a problem hiding this comment.
See seem to only create ByteBufferLogInputStream with maxMessageSize of Integer.MAX_VALUE.
There was a problem hiding this comment.
But this is a general class, right. It may be used in different ways in the future. We should try to write robust code that makes sense conceptually. If there are some assumptions, we should document them and ideally have tests so that we can't break them.
There was a problem hiding this comment.
Shouldn't we do something If this evaluates to false?
There was a problem hiding this comment.
When invoked from nextBatch, we return null and would do this again when there is sufficient data in the buffer (that was the existing behaviour). For log cleaner, this should always succeed since since we are growing buffer beyond max.message.bytes.
There was a problem hiding this comment.
We should include the log segment base offset if we can in messages such as this. Also, at this point we have already allocated the batch, so is checking crc even helpful?
There was a problem hiding this comment.
I was thinking that if there is a lot of corrupt data in the logs and we managed to allocate a buffer for the first one because the size was small enough to allocate, it may be worth checking CRC to detect corruption and avoid allocating even larger buffers later.
There was a problem hiding this comment.
Updated exception messsage.
There was a problem hiding this comment.
Should we be duplicating the buffer?
There was a problem hiding this comment.
Also, not really sure we need to expose nextBatchSize. Couldn't we do nextBatch().size()?
There was a problem hiding this comment.
We dont change the position in the buffer in nextBatchSize, so duplicate() is not required? The unit test verifies that position is not changed.
nextBatch() returns null if the buffer is not large enough to hold the batch. nextBatchSize returns the batch size as long as the header is present, so we can allocate the buffer based on this size. Have I misunderstood the comment?
There was a problem hiding this comment.
The notion of "next batch" is a little weird for MemoryRecords since it just represents a chunk of messages. Is firstBatchSize the intent?
There was a problem hiding this comment.
Good point, changed to firstBatchSize.
There was a problem hiding this comment.
Hmm.. should we validate the CRC before growing the buffers? The length is not protected by the CRC of course, but corruption may impact both the length and parts of the batch.
There was a problem hiding this comment.
Discussed with @ijuma offline. This probably doesn't make much sense because checking the CRC requires pulling the record into memory in the first place (which requires the length). In that case, I feel like the CRC check on the first batch is just kind of weird. Do we really get a lot of benefit from it? Maybe a simple validation we could do is at least ensure that the size of the batch is not bigger than the remaining size of the segment.
There was a problem hiding this comment.
It seems that we need to grow buffers in a similar way in buildOffsetMapForSegment() too?
2e234b1 to
43aa1dc
Compare
|
@ijuma @hachikuji @junrao Thanks for the reviews. I have addressed the comments so far. |
hachikuji
left a comment
There was a problem hiding this comment.
LGTM, thanks for the fix. Just one comment which does not need re-review.
| } | ||
|
|
||
| def createLogWithMessagesLargerThanMaxSize(largeMessageSize: Int): (Log, FakeOffsetMap) = { | ||
| // Create cleaner with very small default max message size |
There was a problem hiding this comment.
This comment seems misplaced?
|
@hachikuji Thanks for the review, merging to trunk. |
…-record-version * apache-github/trunk: KAFKA-6894: Improve err msg when connecting processor with global store (apache#5000) KAFKA-6893; Create processors before starting acceptor in SocketServer (apache#4999) MINOR: Fix typo in ConsumerRebalanceListener JavaDoc (apache#4996) MINOR: Remove deprecated valueTransformer.punctuate (apache#4993) MINOR: Update dynamic broker configuration doc for truststore update (apache#4954) KAFKA-6870 Concurrency conflicts in SampledStat (apache#4985) KAFKA-6361: Fix log divergence between leader and follower after fast leader fail over (apache#4882) KAFKA-6813: Remove deprecated APIs in KIP-182, Part II (apache#4976) KAFKA-6878 Switch the order of underlying.init and initInternal (apache#4988) KAFKA-6299; Fix AdminClient error handling when metadata changes (apache#4295) KAFKA-6878: NPE when querying global state store not in READY state (apache#4978) KAFKA 6673: Implemented missing override equals method (apache#4745) KAFKA-6834: Handle compaction with batches bigger than max.message.bytes (apache#4953)
…tes (apache#4953) Grow buffers in log cleaner to hold one message set after sanity check even if message set is bigger than max.message.bytes. Reviewers: Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk>, Jun Rao <junrao@gmail.com>
…tes (apache#4953) Grow buffers in log cleaner to hold one message set after sanity check even if message set is bigger than max.message.bytes. Reviewers: Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk>, Jun Rao <junrao@gmail.com>
Committer Checklist (excluded from commit message)