KAFKA-806: Index may not always observe log.index.interval.bytes#18012
KAFKA-806: Index may not always observe log.index.interval.bytes#18012junrao merged 9 commits intoapache:trunkfrom
Conversation
d21b5f6 to
e8d2320
Compare
a80cc4c to
ebfb248
Compare
junrao
left a comment
There was a problem hiding this comment.
@FrankYang0529 : Thanks for the PR. Left a couple of comments.
| for (RecordBatch batch : records.batches()) { | ||
| if (bytesSinceLastIndexEntry > indexIntervalBytes && | ||
| batch.lastOffset() >= offsetIndex().lastOffset()) { | ||
| offsetIndex().append(batch.lastOffset(), physicalPosition); |
There was a problem hiding this comment.
Currently, every time we append an entry to the offset index, we also append an entry to the timestamp index. It would be useful to keep it that way.
There was a problem hiding this comment.
Hi @junrao, thanks for review. I addressed both comments.
For timestamp, it's not always monotonic in records, so checking offset by timestamp index is not as much as offset index. Probably, we can consider whether it's worth to add timestamp for each batch, because this operation introduces more cost.
There was a problem hiding this comment.
It's true that the time index is used less frequently. But I am not sure it's worth optimizing since its size is small.
There was a problem hiding this comment.
Maybe we can do a bit refactor for this hot method to reduce the access of buffer and check.
for (RecordBatch batch : records.batches()) {
var batchMaxTimestamp = batch.maxTimestamp();
var batchLastOffset = batch.lastOffset();
var updateTimeIndex = false;
if (batchMaxTimestamp > maxTimestampSoFar()) {
maxTimestampAndOffsetSoFar = new TimestampOffset(batchMaxTimestamp, batchLastOffset);
updateTimeIndex = true;
}
if (bytesSinceLastIndexEntry > indexIntervalBytes) {
offsetIndex().append(batchLastOffset, physicalPosition);
// max timestamp may not be monotonic, so we need to check it to avoid the time index append error
if (updateTimeIndex) timeIndex().maybeAppend(maxTimestampSoFar(), shallowOffsetOfMaxTimestampSoFar());
bytesSinceLastIndexEntry = 0;
}
var sizeInBytes = batch.sizeInBytes();
physicalPosition += sizeInBytes;
bytesSinceLastIndexEntry += sizeInBytes;
}| // append an entry to the offset index at batches level (if needed) | ||
| for (RecordBatch batch : records.batches()) { | ||
| if (bytesSinceLastIndexEntry > indexIntervalBytes && | ||
| batch.lastOffset() >= offsetIndex().lastOffset()) { |
There was a problem hiding this comment.
This is unnecessary. On the leader side, we call LogValidator.validateBatch, which makes sure each batch has at least one record.
ebfb248 to
dbed266
Compare
b9f1ae1 to
e08001e
Compare
junrao
left a comment
There was a problem hiding this comment.
@FrankYang0529 : Thanks for the updated PR. A few more comments.
| long batchShallowOffsetOfMaxTimestamp = -1L; | ||
| // append an entry to the index at batches level (if needed) | ||
| for (RecordBatch batch : records.batches()) { | ||
| if (batch.maxTimestamp() > batchMaxTimestamp) { |
There was a problem hiding this comment.
Could we get rid of this condition and make batchMaxTimestamp and batchShallowOffsetOfMaxTimestamp a local val in the loop?
| long appendedBytes = log.append(records); | ||
| LOGGER.trace("Appended {} to {} at end offset {}", appendedBytes, log.file(), largestOffset); | ||
| // Update the in memory max timestamp and corresponding offset. | ||
| if (largestTimestampMs > maxTimestampSoFar()) { |
There was a problem hiding this comment.
With this change, we can remove both largestTimestampMs and shallowOffsetOfMaxTimestamp. We probably can also remove LogAppendInfo.shallowOffsetOfMaxTimestamp.
There was a problem hiding this comment.
Yes, removed largestTimestampMs and shallowOffsetOfMaxTimestamp in LogSegment#append. Also, removed LogAppendInfo.shallowOffsetOfMaxTimestamp because it's not referenced anymore.
| offsetIndex().append(batch.lastOffset(), physicalPosition); | ||
|
|
||
| // max timestamp may not be monotonic, so we need to check it to avoid the time index append error | ||
| if (batchMaxTimestamp >= timeIndex().lastEntry().timestamp) |
There was a problem hiding this comment.
This is unnecessary since TimeIndex has the same check.
// We only append to the time index when the timestamp is greater than the last inserted timestamp.
// If all the messages are in message format v0, the timestamp will always be NoTimestamp. In that case, the time
// index will be empty.
if (timestamp > lastEntry.timestamp) {
There was a problem hiding this comment.
I think @FrankYang0529 tried to avoid the exception of adding a "smaller" timestamp. see
There was a problem hiding this comment.
Yes, like @chia7712 mentioned, if there is timestamp index in the file and new timestamp is less than old one, TimeIndex#maybeAppend will throw error.
|
|
||
| // max timestamp may not be monotonic, so we need to check it to avoid the time index append error | ||
| if (batchMaxTimestamp >= timeIndex().lastEntry().timestamp) | ||
| timeIndex().maybeAppend(batchMaxTimestamp, shallowOffsetOfMaxTimestampSoFar()); |
There was a problem hiding this comment.
Hmm, we should use maxTimestampSoFar() instead batchMaxTimestamp, right?
There was a problem hiding this comment.
Yes, updated it. Thanks.
| for (RecordBatch batch : records.batches()) { | ||
| if (bytesSinceLastIndexEntry > indexIntervalBytes && | ||
| batch.lastOffset() >= offsetIndex().lastOffset()) { | ||
| offsetIndex().append(batch.lastOffset(), physicalPosition); |
There was a problem hiding this comment.
It's true that the time index is used less frequently. But I am not sure it's worth optimizing since its size is small.
e08001e to
fd8bbc9
Compare
junrao
left a comment
There was a problem hiding this comment.
@FrankYang0529 : Thanks for the update PR. A few more comments.
| } | ||
| bytesSinceLastIndexEntry += records.sizeInBytes(); | ||
|
|
||
| if (updateRollingBasedTimestamp) rollingBasedTimestamp = OptionalLong.of(recordsLargestTimestampMs); |
There was a problem hiding this comment.
It's better to assign the timestamp of the first batch in a segment to rollingBasedTimestamp. This way, it's consistent between the leader and the follower.
There was a problem hiding this comment.
Since we utilize the timestamp of the first batch in other parts of the code, it is beneficial to maintain consistency by aligning it within this PR. Alternatively, we can keep rollingBasedTimestamp empty and initialize it during the method call.
There was a problem hiding this comment.
Agree, for consistency, we can leave LogSegment#loadFirstBatchTimestamp to handle rollingBasedTimestamp. Remove related code in append function.
| new LogAppendInfo(firstOffset, lastOffset, lastLeaderEpochOpt, maxTimestamp, shallowOffsetOfMaxTimestamp, | ||
| RecordBatch.NO_TIMESTAMP, logStartOffset, RecordValidationStats.EMPTY, sourceCompression, | ||
| validBytesCount, lastOffsetOfFirstBatch, Collections.emptyList[RecordError], LeaderHwChange.NONE) | ||
| new LogAppendInfo(firstOffset, lastOffset, lastLeaderEpochOpt, maxTimestamp, RecordBatch.NO_TIMESTAMP, logStartOffset, RecordValidationStats.EMPTY, sourceCompression, validBytesCount, lastOffsetOfFirstBatch, Collections.emptyList[RecordError], LeaderHwChange.NONE) |
There was a problem hiding this comment.
It's probably better to still split this long statement into multiple lines.
There was a problem hiding this comment.
Yes, updated it. Thanks.
|
|
||
| seg.append(41, RecordBatch.NO_TIMESTAMP, -1L, v1Records(40, "hello", "there")); | ||
| seg.append(41, | ||
| MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V1, 40, Compression.NONE, TimestampType.CREATE_TIME, |
There was a problem hiding this comment.
Why is this change needed? Also, since we no longer support v0/v1 message format. We need to remove all v1Records usage in this test. Should we do it in this PR or a separate one?
There was a problem hiding this comment.
In 4.0, we don't support v0/v1 message from producer. However, do we support a case that a 4.0 broker works with other versions and the leader node has old message format? In this case, we may still get v0/v1 message format.
There was a problem hiding this comment.
I think @FrankYang0529 is interested in testing a scenario where a 4.0 follower sends a FetchRequest to a leader that contains v0/v1 messages.
There was a problem hiding this comment.
Ok. Why do we change from inserting 2 records to 1 record? Also, could we continue using v1Records?
There was a problem hiding this comment.
Why do we change from inserting 2 records to 1 record?
My mistake. Changed it back to 2 records.
could we continue using
v1Records?
Probably no. The old test rely on largestTimestampMs to assign rollingBasedTimestamp as RecordBatch.NO_TIMESTAMP. In this PR, we assign first batch max timestamp for rollingBasedTimestamp. In v1Records, it sets record timestamp as offset * 10, so it's not RecordBatch.NO_TIMESTAMP.
| boolean updateTimeIndex = false; | ||
| if (batchMaxTimestamp > maxTimestampSoFar()) { | ||
| maxTimestampAndOffsetSoFar = new TimestampOffset(batchMaxTimestamp, batchLastOffset); | ||
| updateTimeIndex = true; |
There was a problem hiding this comment.
This code seems still not quite right. We need to remember updateTimeIndex once we get a higher timestamp instead of resetting it in every batch.
There was a problem hiding this comment.
Yes, remove updateTimeIndex. Thanks.
| offsetIndex().append(batchLastOffset, physicalPosition); | ||
|
|
||
| // max timestamp may not be monotonic, so we need to check it to avoid the time index append error | ||
| if (updateTimeIndex) timeIndex().maybeAppend(maxTimestampSoFar(), shallowOffsetOfMaxTimestampSoFar()); |
There was a problem hiding this comment.
It seems that we don't need to maintain updateTimeIndex. maxTimestampSoFar() is always >= the last time index entry. If it's equal, it will be ignored in timeIndex().maybeAppend and won't trigger IllegalStateException.
There was a problem hiding this comment.
The purpose of using updateTimeIndex is to minimize unnecessary calls to maybeAppend by ensuring that it's only invoked when a larger timestamp is encountered. That can avoid requiring unnecessary locking, but I think maybe that is unnecessary optimization. We can totally remove updateTimeIndex to simplify code
| expectedMaxTimestampOffset = 2; | ||
| break; | ||
| } | ||
| assertEquals(expectedMaxTimestampOffset, validatedResults.shallowOffsetOfMaxTimestamp); |
There was a problem hiding this comment.
expectedMaxTimestampOffset is no longer used and should be removed.
There was a problem hiding this comment.
Removed it. Thanks.
| assertEquals(1, segment.offsetIndex().lookup(1L).offset); | ||
| assertEquals(2, segment.offsetIndex().lookup(2L).offset); | ||
|
|
||
| assertEquals(1, segment.timeIndex().entries()); |
There was a problem hiding this comment.
It seems that we should have two index entries in this test, the first for timestamp 1 and the second of timestamp 2.
|
|
||
| assertEquals(1, segment.timeIndex().entries()); | ||
| assertEquals(0L, segment.timeIndex().lookup(1L).offset); | ||
| assertEquals(2L, segment.timeIndex().lookup(2L).offset); |
There was a problem hiding this comment.
Could we use timeIndex().entry to explicitly fetch each index entry instead of using timeIndex().lookup?
There was a problem hiding this comment.
Updated assertion. Thanks.
fd8bbc9 to
80c9ad4
Compare
junrao
left a comment
There was a problem hiding this comment.
@FrankYang0529 : Thanks for the updated PR. A few more comments.
| if (bytesSinceLastIndexEntry > indexIntervalBytes) { | ||
| offsetIndex().append(batchLastOffset, physicalPosition); | ||
| timeIndex().maybeAppend(maxTimestampSoFar(), shallowOffsetOfMaxTimestampSoFar()); | ||
|
|
| assertEquals(2, segment.offsetIndex().lookup(2L).offset); | ||
|
|
||
| assertEquals(2, segment.timeIndex().entries()); | ||
| assertEquals(new TimestampOffset(1, 0), segment.timeIndex().entry(0)); |
There was a problem hiding this comment.
Could we use entry for the offset index and the other test too?
There was a problem hiding this comment.
Thanks for the suggestion. I only check offset field for offset index, because the position field may looks like some kind of magic value. WDYT?
|
|
||
| seg.append(41, RecordBatch.NO_TIMESTAMP, -1L, v1Records(40, "hello", "there")); | ||
| seg.append(41, | ||
| MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V1, 40, Compression.NONE, TimestampType.CREATE_TIME, |
There was a problem hiding this comment.
Ok. Why do we change from inserting 2 records to 1 record? Also, could we continue using v1Records?
bed1b2b to
717f252
Compare
junrao
left a comment
There was a problem hiding this comment.
@FrankYang0529 : Thanks for the updated PR. LGTM. Are the flaky tests related to this PR?
|
Hi @junrao, thanks for the review. I check failed case and all are not related to this PR. I can't reproduce them on my laptop.
|
717f252 to
57036cf
Compare
|
Last build: https://github.com/apache/kafka/actions/runs/12784403901/job/35637865171?pr=18012
|
chia7712
left a comment
There was a problem hiding this comment.
@FrankYang0529 thanks for this patch. please take a look at following two minor questions
| now, | ||
| records, | ||
| maxTimestamp, | ||
| shallowOffsetOfMaxTimestamp, |
There was a problem hiding this comment.
shallowOffsetOfMaxTimestamp is useless, so could you please remove it also?
There was a problem hiding this comment.
Removed it. Thanks for the reminder.
|
|
||
| if (bytesSinceLastIndexEntry > indexIntervalBytes) { | ||
| offsetIndex().append(batchLastOffset, physicalPosition); | ||
| timeIndex().maybeAppend(maxTimestampSoFar(), shallowOffsetOfMaxTimestampSoFar()); |
There was a problem hiding this comment.
If two batches have the same timestamp, and both timestamps are greater than maxTimestampSoFar, and the first batch is skipped due to indexIntervalBytes, the resulting index for searching the 'earliest offset' of the maximum timestamp using TimestampSpec will be incorrect.
There was a problem hiding this comment.
Hmm, in that case, maxTimestampAndOffsetSoFar will point to the offset of the first batch, which will be appended to the time index. This seem correct, right?
There was a problem hiding this comment.
you are right, we update the maxTimestampAndOffsetSoFar only if batchMaxTimestamp > maxTimestampSoFar() is true. Hence, the offset point to the first batch.
Signed-off-by: PoAn Yang <payang@apache.org>
Signed-off-by: PoAn Yang <payang@apache.org>
Signed-off-by: PoAn Yang <payang@apache.org>
Signed-off-by: PoAn Yang <payang@apache.org>
Signed-off-by: PoAn Yang <payang@apache.org>
Signed-off-by: PoAn Yang <payang@apache.org>
Signed-off-by: PoAn Yang <payang@apache.org>
57036cf to
7b20d92
Compare
junrao
left a comment
There was a problem hiding this comment.
@FrankYang0529 : Thanks for the updated PR. One more minor comment.
| MetricsRecorder metricsRecorder) { | ||
| long now = time.milliseconds(); | ||
| long maxTimestamp = RecordBatch.NO_TIMESTAMP; | ||
| long shallowOffsetOfMaxTimestamp = -1L; |
There was a problem hiding this comment.
Could we also remove initialOffset?
Signed-off-by: PoAn Yang <payang@apache.org>
junrao
left a comment
There was a problem hiding this comment.
@FrankYang0529 : Thanks for the updated PR. LGTM
|
Can we please update the PR description with what we decided to do here? It's a bit sparse. |
|
Thanks! The last bullet point seems to be missing a word or something. |
…che#18012) Currently, each log.append() will add at most 1 index entry, even when the appended data is larger than log.index.interval.bytes. One potential issue is that if a follower restarts after being down for a long time, it may fetch data much bigger than log.index.interval.bytes at a time. This means that fewer index entries are created, which can increase the fetch time from the consumers. Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Jun Rao <junrao@gmail.com>
…che#18012) Currently, each log.append() will add at most 1 index entry, even when the appended data is larger than log.index.interval.bytes. One potential issue is that if a follower restarts after being down for a long time, it may fetch data much bigger than log.index.interval.bytes at a time. This means that fewer index entries are created, which can increase the fetch time from the consumers. Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Jun Rao <junrao@gmail.com>
| } | ||
| // append an entry to the index (if needed) | ||
| if (bytesSinceLastIndexEntry > indexIntervalBytes) { | ||
| offsetIndex().append(largestOffset, physicalPosition); |
There was a problem hiding this comment.
In scenarios where a new follower is synchronizing from another replica, another issue arises. If the records consists of multiple batches, the physicalPosition will reflect the position of the first batch. This leads to an inaccurate index, represented as (offset_of_last_batch, position_of_first_batch). This issue can be easily reproduced, and our dump log tool currently displays warnings to indicate this inconsistency.
Mismatches in :/home/chia7712/ikea-0-follower/00000000000000000000.index
Index offset: 2062680, log offset: 2061012
Index offset: 2060984, log offset: 2058974
Index offset: 2058940, log offset: 2056966
Index offset: 2056936, log offset: 2054979
Index offset: 2054950, log offset: 2052962
Index offset: 2052931, log offset: 2050959
Index offset: 2050923, log offset: 2048924
Index offset: 2048895, log offset: 2046967
This drawback is analogous to having fewer index entries, which can lead to increased fetch times. I'm happy that @FrankYang0529 fix "two" issues by this PR.
However, I'm wondering if we should address this issue in the active 3.x version. We could calculate the accurate position by using the size of the last batch. For example:
int position = records.lastBatch().map(b -> log.sizeInBytes() - b.sizeInBytes()).orElse(physicalPosition);
offsetIndex().append(largestOffset, position);
There was a problem hiding this comment.
@chia7712 Thanks for the information. I will create a PR to 3.9 tomorrow.
There was a problem hiding this comment.
@FrankYang0529 I open https://issues.apache.org/jira/browse/KAFKA-18759 to trace it for 3.9 and 3.8 - please feel free to take over this jira
There was a problem hiding this comment.
@chia7712 : If we want to address the indexing issue in 3.9, it's probably easier to just cherry-pick this PR to 3.9 instead of fixing a subset of the problem separately, right?
There was a problem hiding this comment.
@junrao I tried to minimize the changes to the 3.9, but you are right that we can backport this PR instead.
@FrankYang0529 WDYT? If you agree to backport this PR, could you please file a PR for 3.9? Thanks.
…che#18012) Currently, each log.append() will add at most 1 index entry, even when the appended data is larger than log.index.interval.bytes. One potential issue is that if a follower restarts after being down for a long time, it may fetch data much bigger than log.index.interval.bytes at a time. This means that fewer index entries are created, which can increase the fetch time from the consumers. Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Jun Rao <junrao@gmail.com> (cherry picked from commit e124d39)
…che#18012) Currently, each log.append() will add at most 1 index entry, even when the appended data is larger than log.index.interval.bytes. One potential issue is that if a follower restarts after being down for a long time, it may fetch data much bigger than log.index.interval.bytes at a time. This means that fewer index entries are created, which can increase the fetch time from the consumers. Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Jun Rao <junrao@gmail.com> (cherry picked from commit e124d39)
…che#18012) Currently, each log.append() will add at most 1 index entry, even when the appended data is larger than log.index.interval.bytes. One potential issue is that if a follower restarts after being down for a long time, it may fetch data much bigger than log.index.interval.bytes at a time. This means that fewer index entries are created, which can increase the fetch time from the consumers. Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Jun Rao <junrao@gmail.com>
…che#18012) Currently, each log.append() will add at most 1 index entry, even when the appended data is larger than log.index.interval.bytes. One potential issue is that if a follower restarts after being down for a long time, it may fetch data much bigger than log.index.interval.bytes at a time. This means that fewer index entries are created, which can increase the fetch time from the consumers. Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Jun Rao <junrao@gmail.com> (cherry picked from commit e124d39)
Currently, each
log.append()will add at most 1 index entry, even when the appended data is larger thanlog.index.interval.bytes. One potential issue is that if a follower restarts after being down for a long time, it may fetch data much bigger thanlog.index.interval.bytesat a time. This means that fewer index entries are created, which can increase the fetch time from the consumers.Changes in this PR:
MemoryRecords. If bytes since last index entry is large thanlog.index.interval.bytes, appending a offset/timestamp index.rollingBasedTimestampas largest timestamp in the firstMemoryRecords. After this PR,rollingBasedTimestampis largest timestamp in the first batch, so the behavior is consistent between leader and follower.LogAppendInfor#shallowOffsetOfMaxTimestamp. We iterate each batch inappendfunction, so we can get the value byshallowOffsetOfMaxTimestampSoFar.Committer Checklist (excluded from commit message)