Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
a7d6288
KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a…
chia7712 Mar 28, 2024
bd32d03
Merge branch 'trunk' into KAFKA-16310
chia7712 Mar 28, 2024
0f18cae
Merge branch 'trunk' into KAFKA-16310
chia7712 Mar 29, 2024
2c00cee
add more tests
chia7712 Mar 29, 2024
5278cd1
Merge branch 'trunk' into KAFKA-16310
chia7712 Mar 29, 2024
90d33e0
revert code and add comments
chia7712 Mar 30, 2024
f11b877
Merge branch 'trunk' into KAFKA-16310
chia7712 Mar 30, 2024
5180f2d
adjust comments
chia7712 Mar 30, 2024
79561cd
adjust comments
chia7712 Mar 30, 2024
726de99
adjust comments
chia7712 Mar 30, 2024
29137ae
ranem offsetOfMaxTimestampSoFar to shallowOffsetOfMaxTimestampSoFar
chia7712 Mar 30, 2024
1cf8ca4
Merge branch 'trunk' into KAFKA-16310
chia7712 Mar 30, 2024
ac729a6
Merge branch 'trunk' into KAFKA-16310
chia7712 Apr 1, 2024
2297b52
apply luke patch
chia7712 Apr 1, 2024
a1fbc6c
Merge branch 'trunk' into KAFKA-16310
chia7712 Apr 1, 2024
9505115
Merge branch 'trunk' into KAFKA-16310
chia7712 Apr 1, 2024
b4b1332
address comments
chia7712 Apr 1, 2024
609b6ab
address comments
chia7712 Apr 2, 2024
a535095
tweak
chia7712 Apr 2, 2024
3bd1451
fix build
chia7712 Apr 2, 2024
3dca50c
Merge branch 'trunk' into KAFKA-16310
chia7712 Apr 2, 2024
8e1878a
fix bug
chia7712 Apr 2, 2024
e78d09a
address comments
chia7712 Apr 2, 2024
f084c89
revise comment
chia7712 Apr 2, 2024
d0f6a9a
fix failed test
chia7712 Apr 2, 2024
7aebd43
address comment
chia7712 Apr 2, 2024
88f8c6a
fix
chia7712 Apr 2, 2024
8a7ed30
return noneif no batch
chia7712 Apr 3, 2024
d1fb7b1
Merge branch 'trunk' into KAFKA-16310
chia7712 Apr 3, 2024
4785371
address comment
chia7712 Apr 3, 2024
cfb12b6
Merge branch 'trunk' into KAFKA-16310
chia7712 Apr 3, 2024
79b5226
address reviews
chia7712 Apr 3, 2024
e6fa99f
give more time to sync
chia7712 Apr 4, 2024
0106386
Merge branch 'trunk' into KAFKA-16310
chia7712 Apr 4, 2024
3debf0b
revise test
chia7712 Apr 4, 2024
20fa22f
revise test
chia7712 Apr 4, 2024
b06dfcc
set lastLeader to -1
chia7712 Apr 4, 2024
afa36f4
Merge branch 'trunk' into KAFKA-16310
chia7712 Apr 5, 2024
07f1330
Merge branch 'trunk' into KAFKA-16310
chia7712 Apr 5, 2024
ace94e4
set log.retention.ms
chia7712 Apr 5, 2024
9af7513
Merge branch 'trunk' into KAFKA-16310
chia7712 Apr 6, 2024
8b1005e
address comments
chia7712 Apr 6, 2024
58b5e2f
Merge branch 'trunk' into KAFKA-16310
chia7712 Apr 7, 2024
728e7bb
fix failed tests
chia7712 Apr 7, 2024
e21447e
Merge branch 'trunk' into KAFKA-16310
chia7712 Apr 7, 2024
9e22f2d
Merge branch 'trunk' into KAFKA-16310
chia7712 Apr 7, 2024
fe2d7c9
fix testResponseIncludesLeaderEpoch
chia7712 Apr 7, 2024
2dee703
fix failed test
chia7712 Apr 8, 2024
b0afa6d
Merge branch 'trunk' into KAFKA-16310
chia7712 Apr 8, 2024
581242c
address comments
chia7712 Apr 8, 2024
d42fe26
Merge branch 'trunk' into KAFKA-16310
chia7712 Apr 9, 2024
d12fce2
Merge branch 'trunk' into KAFKA-16310
chia7712 Apr 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -355,18 +355,18 @@ public TimestampAndOffset searchForTimestamp(long targetTimestamp, int startingP
*/
public TimestampAndOffset largestTimestampAfter(int startingPosition) {
long maxTimestamp = RecordBatch.NO_TIMESTAMP;
long offsetOfMaxTimestamp = -1L;
long shallowOffsetOfMaxTimestamp = -1L;
int leaderEpochOfMaxTimestamp = RecordBatch.NO_PARTITION_LEADER_EPOCH;

for (RecordBatch batch : batchesFrom(startingPosition)) {
long timestamp = batch.maxTimestamp();
if (timestamp > maxTimestamp) {
maxTimestamp = timestamp;
offsetOfMaxTimestamp = batch.lastOffset();
shallowOffsetOfMaxTimestamp = batch.lastOffset();
leaderEpochOfMaxTimestamp = batch.partitionLeaderEpoch();
}
}
return new TimestampAndOffset(maxTimestamp, offsetOfMaxTimestamp,
return new TimestampAndOffset(maxTimestamp, shallowOffsetOfMaxTimestamp,
maybeLeaderEpoch(leaderEpochOfMaxTimestamp));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ private static FilterResult filterTo(TopicPartition partition, Iterable<MutableR
partition, batch.lastOffset(), maxRecordBatchSize, filteredBatchSize);

MemoryRecordsBuilder.RecordsInfo info = builder.info();
filterResult.updateRetainedBatchMetadata(info.maxTimestamp, info.offsetOfMaxTimestamp,
filterResult.updateRetainedBatchMetadata(info.maxTimestamp, info.shallowOffsetOfMaxTimestamp,
maxOffset, retainedRecords.size(), filteredBatchSize);
}
}
Expand Down Expand Up @@ -399,7 +399,7 @@ public static class FilterResult {
private int bytesRetained = 0;
private long maxOffset = -1L;
private long maxTimestamp = RecordBatch.NO_TIMESTAMP;
private long offsetOfMaxTimestamp = -1L;
private long shallowOffsetOfMaxTimestamp = -1L;

private FilterResult(ByteBuffer outputBuffer) {
this.outputBuffer = outputBuffer;
Expand All @@ -411,21 +411,21 @@ private void updateRetainedBatchMetadata(MutableRecordBatch retainedBatch, int n
retainedBatch.lastOffset(), numMessagesInBatch, bytesRetained);
}

private void updateRetainedBatchMetadata(long maxTimestamp, long offsetOfMaxTimestamp, long maxOffset,
private void updateRetainedBatchMetadata(long maxTimestamp, long shallowOffsetOfMaxTimestamp, long maxOffset,
int messagesRetained, int bytesRetained) {
validateBatchMetadata(maxTimestamp, offsetOfMaxTimestamp, maxOffset);
validateBatchMetadata(maxTimestamp, shallowOffsetOfMaxTimestamp, maxOffset);
if (maxTimestamp > this.maxTimestamp) {
this.maxTimestamp = maxTimestamp;
this.offsetOfMaxTimestamp = offsetOfMaxTimestamp;
this.shallowOffsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp;
}
this.maxOffset = Math.max(maxOffset, this.maxOffset);
this.messagesRetained += messagesRetained;
this.bytesRetained += bytesRetained;
}

private void validateBatchMetadata(long maxTimestamp, long offsetOfMaxTimestamp, long maxOffset) {
if (maxTimestamp != RecordBatch.NO_TIMESTAMP && offsetOfMaxTimestamp < 0)
throw new IllegalArgumentException("offset undefined for maximum timestamp " + maxTimestamp);
private void validateBatchMetadata(long maxTimestamp, long shallowOffsetOfMaxTimestamp, long maxOffset) {
if (maxTimestamp != RecordBatch.NO_TIMESTAMP && shallowOffsetOfMaxTimestamp < 0)
throw new IllegalArgumentException("shallowOffset undefined for maximum timestamp " + maxTimestamp);
if (maxOffset < 0)
throw new IllegalArgumentException("maxOffset undefined");
}
Expand Down Expand Up @@ -458,8 +458,8 @@ public long maxTimestamp() {
return maxTimestamp;
}

public long offsetOfMaxTimestamp() {
return offsetOfMaxTimestamp;
public long shallowOffsetOfMaxTimestamp() {
return shallowOffsetOfMaxTimestamp;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,25 +240,54 @@ public MemoryRecords build() {
return builtRecords;
}


/**
* Get the max timestamp and its offset. The details of the offset returned are a bit subtle.
* Note: The semantic for the offset of max timestamp is the first offset with the max timestamp if there are multi-records having same timestamp.
*
* If the log append time is used, the offset will be the first offset of the record.
*
* If create time is used, the offset will always be the offset of the record with the max timestamp.
*
* If it's NO_TIMESTAMP (i.e. MAGIC_VALUE_V0), we'll return offset -1 since no timestamp info in records.
*
* @return The max timestamp and its offset
* There are three cases of finding max timestamp to return:
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi @junrao I rewrite whole comments to list all cases. please take a look at it, thanks!

* 1) version 0: The max timestamp is NO_TIMESTAMP (-1)
* 2) LogAppendTime: All records have same timestamp, and so the max timestamp is equal to logAppendTime
* 3) CreateTime: The max timestamp of record
* <p>
* Let's talk about OffsetOfMaxTimestamp. There are some paths that we don't try to find the OffsetOfMaxTimestamp
* to avoid expensive records iteration. Those paths include follower append and index recovery. In order to
* avoid inconsistent time index, we let all paths find shallowOffsetOfMaxTimestamp instead of OffsetOfMaxTimestamp.
* <p>
* Let's define the shallowOffsetOfMaxTimestamp: It is last offset of the batch having max timestamp. If there are
* many batches having same max timestamp, we pick up the earliest batch.
* <p>
* There are five cases of finding shallowOffsetOfMaxTimestamp to return:
* 1) version 0: It is always the -1
* 2) LogAppendTime with single batch: It is the offset of last record
* 3) LogAppendTime with many single-record batches: Those single-record batches have same max timestamp, so we return
* the base offset, which is equal to the last offset of earliest batch
* 4) CreateTime with single batch: We return offset of last record to follow the spec we mentioned above. Of course,
* we do have the OffsetOfMaxTimestamp for this case, but we want to make all paths
* find the shallowOffsetOfMaxTimestamp rather than offsetOfMaxTimestamp
* 5) CreateTime with many single-record batches: Each batch is composed of single record, and hence offsetOfMaxTimestamp
* is equal to the last offset of earliest batch with max timestamp
*/
public RecordsInfo info() {
if (timestampType == TimestampType.LOG_APPEND_TIME) {
return new RecordsInfo(logAppendTime, baseOffset);
if (compressionType != CompressionType.NONE || magic >= RecordBatch.MAGIC_VALUE_V2)
// maxTimestamp => case 2
// shallowOffsetOfMaxTimestamp => case 2
return new RecordsInfo(logAppendTime, lastOffset);
else
// maxTimestamp => case 2
// shallowOffsetOfMaxTimestamp => case 3
return new RecordsInfo(logAppendTime, baseOffset);
} else if (maxTimestamp == RecordBatch.NO_TIMESTAMP) {
// maxTimestamp => case 1
// shallowOffsetOfMaxTimestamp => case 1
return new RecordsInfo(RecordBatch.NO_TIMESTAMP, -1);
} else {
// For create time, we always use offsetOfMaxTimestamp for the correct time -> offset mapping
// If it's MAGIC_VALUE_V0, the value will be the default value: [-1, -1]
return new RecordsInfo(maxTimestamp, offsetOfMaxTimestamp);
if (compressionType != CompressionType.NONE || magic >= RecordBatch.MAGIC_VALUE_V2)
// maxTimestamp => case 3
// shallowOffsetOfMaxTimestamp => case 4
return new RecordsInfo(maxTimestamp, lastOffset);
else
// maxTimestamp => case 3
// shallowOffsetOfMaxTimestamp => case 5
return new RecordsInfo(maxTimestamp, offsetOfMaxTimestamp);
}
}

Expand Down Expand Up @@ -849,12 +878,12 @@ private long nextSequentialOffset() {

public static class RecordsInfo {
public final long maxTimestamp;
public final long offsetOfMaxTimestamp;
public final long shallowOffsetOfMaxTimestamp;

public RecordsInfo(long maxTimestamp,
long offsetOfMaxTimestamp) {
long shallowOffsetOfMaxTimestamp) {
this.maxTimestamp = maxTimestamp;
this.offsetOfMaxTimestamp = offsetOfMaxTimestamp;
this.shallowOffsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.Optional;
import java.util.OptionalLong;

/**
Expand Down Expand Up @@ -245,4 +246,23 @@ public interface RecordBatch extends Iterable<Record> {
* @return Whether this is a batch containing control records
*/
boolean isControlBatch();

/**
* iterate all records to find the offset of max timestamp.
* noted:
* 1) that the earliest offset will return if there are multi records having same (max) timestamp
* 2) it always returns None if the {@link RecordBatch#magic()} is equal to {@link RecordBatch#MAGIC_VALUE_V0}
* @return offset of max timestamp
*/
default Optional<Long> offsetOfMaxTimestamp() {
if (magic() == RecordBatch.MAGIC_VALUE_V0) return Optional.empty();
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@junrao the short-circuit is added

long maxTimestamp = maxTimestamp();
try (CloseableIterator<Record> iter = streamingIterator(BufferSupplier.create())) {
while (iter.hasNext()) {
Record record = iter.next();
if (maxTimestamp == record.timestamp()) return Optional.of(record.offset());
}
}
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -377,8 +377,11 @@ public void buildUsingLogAppendTime(Args args) {

MemoryRecordsBuilder.RecordsInfo info = builder.info();
assertEquals(logAppendTime, info.maxTimestamp);
// When logAppendTime is used, the first offset of the batch will be the offset of maxTimestamp
assertEquals(0L, info.offsetOfMaxTimestamp);

if (args.compressionType == CompressionType.NONE && magic <= MAGIC_VALUE_V1)
assertEquals(0L, info.shallowOffsetOfMaxTimestamp);
else
assertEquals(2L, info.shallowOffsetOfMaxTimestamp);

for (RecordBatch batch : records.batches()) {
if (magic == MAGIC_VALUE_V0) {
Expand Down Expand Up @@ -413,10 +416,11 @@ public void buildUsingCreateTime(Args args) {
}

if (magic == MAGIC_VALUE_V0)
// in MAGIC_VALUE_V0's case, we don't have timestamp info in records, so always return -1.
assertEquals(-1L, info.offsetOfMaxTimestamp);
assertEquals(-1, info.shallowOffsetOfMaxTimestamp);
else if (args.compressionType == CompressionType.NONE && magic == MAGIC_VALUE_V1)
assertEquals(1L, info.shallowOffsetOfMaxTimestamp);
else
assertEquals(1L, info.offsetOfMaxTimestamp);
assertEquals(2L, info.shallowOffsetOfMaxTimestamp);

int i = 0;
long[] expectedTimestamps = new long[] {0L, 2L, 1L};
Expand Down Expand Up @@ -494,11 +498,11 @@ public void writePastLimit(Args args) {

MemoryRecordsBuilder.RecordsInfo info = builder.info();
if (magic == MAGIC_VALUE_V0) {
assertEquals(-1, info.shallowOffsetOfMaxTimestamp);
assertEquals(-1, info.maxTimestamp);
assertEquals(-1L, info.offsetOfMaxTimestamp);
} else {
assertEquals(2L, info.shallowOffsetOfMaxTimestamp);
assertEquals(2L, info.maxTimestamp);
assertEquals(2L, info.offsetOfMaxTimestamp);
}

long i = 0L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ protected boolean shouldRetainRecord(RecordBatch recordBatch, Record record) {
assertEquals(0, filterResult.messagesRetained());
assertEquals(DefaultRecordBatch.RECORD_BATCH_OVERHEAD, filterResult.bytesRetained());
assertEquals(12, filterResult.maxTimestamp());
assertEquals(baseOffset + 1, filterResult.offsetOfMaxTimestamp());
assertEquals(baseOffset + 1, filterResult.shallowOffsetOfMaxTimestamp());

// Verify filtered records
filtered.flip();
Expand Down Expand Up @@ -413,7 +413,7 @@ protected boolean shouldRetainRecord(RecordBatch recordBatch, Record record) {
assertEquals(0, filterResult.messagesRetained());
assertEquals(DefaultRecordBatch.RECORD_BATCH_OVERHEAD, filterResult.bytesRetained());
assertEquals(timestamp, filterResult.maxTimestamp());
assertEquals(baseOffset, filterResult.offsetOfMaxTimestamp());
assertEquals(baseOffset, filterResult.shallowOffsetOfMaxTimestamp());
assertTrue(filterResult.outputBuffer().position() > 0);

// Verify filtered records
Expand Down Expand Up @@ -893,7 +893,10 @@ public void testFilterTo(Args args) {
assertEquals(filtered.limit(), result.bytesRetained());
if (magic > RecordBatch.MAGIC_VALUE_V0) {
assertEquals(20L, result.maxTimestamp());
assertEquals(4L, result.offsetOfMaxTimestamp());
if (compression == CompressionType.NONE && magic < RecordBatch.MAGIC_VALUE_V2)
assertEquals(4L, result.shallowOffsetOfMaxTimestamp());
else
assertEquals(5L, result.shallowOffsetOfMaxTimestamp());
}

MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered);
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/log/LocalLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -406,8 +406,8 @@ class LocalLog(@volatile private var _dir: File,
}
}

private[log] def append(lastOffset: Long, largestTimestamp: Long, offsetOfMaxTimestamp: Long, records: MemoryRecords): Unit = {
segments.activeSegment.append(lastOffset, largestTimestamp, offsetOfMaxTimestamp, records)
private[log] def append(lastOffset: Long, largestTimestamp: Long, shallowOffsetOfMaxTimestamp: Long, records: MemoryRecords): Unit = {
segments.activeSegment.append(lastOffset, largestTimestamp, shallowOffsetOfMaxTimestamp, records)
updateLogEndOffset(lastOffset + 1)
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/log/LogCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -812,7 +812,7 @@ private[log] class Cleaner(val id: Int,
val retained = MemoryRecords.readableRecords(outputBuffer)
// it's OK not to hold the Log's lock in this case, because this segment is only accessed by other threads
// after `Log.replaceSegments` (which acquires the lock) is called
dest.append(result.maxOffset, result.maxTimestamp, result.offsetOfMaxTimestamp, retained)
dest.append(result.maxOffset, result.maxTimestamp, result.shallowOffsetOfMaxTimestamp(), retained)
throttler.maybeThrottle(outputBuffer.limit())
}

Expand Down
Loading