Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@
"about": "Each partition in the request.", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
"about": "The partition index." },
{ "name": "CurrentLeaderEpoch", "type": "int32", "versions": "4+", "default": "-1",
{ "name": "CurrentLeaderEpoch", "type": "int32", "versions": "4+", "default": "-1", "ignorable": true,
"about": "The current leader epoch." },
{ "name": "Timestamp", "type": "int64", "versions": "0+",
"about": "The current timestamp." },
{ "name": "MaxNumOffsets", "type": "int32", "versions": "0",
{ "name": "MaxNumOffsets", "type": "int32", "versions": "0", "default": "1",
"about": "The maximum number of offsets to report." }
]}
]}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1243,7 +1243,8 @@ private ListOffsetRequest createListOffsetRequest(int version) {
.setPartitions(Arrays.asList(new ListOffsetPartition()
.setPartitionIndex(0)
.setTimestamp(1000000L)
.setMaxNumOffsets(10)));
.setMaxNumOffsets(10)
.setCurrentLeaderEpoch(5)));
return ListOffsetRequest.Builder
.forConsumer(false, IsolationLevel.READ_UNCOMMITTED)
.setTargetTimes(Collections.singletonList(topic))
Expand All @@ -1253,18 +1254,18 @@ private ListOffsetRequest createListOffsetRequest(int version) {
.setName("test")
.setPartitions(Arrays.asList(new ListOffsetPartition()
.setPartitionIndex(0)
.setTimestamp(1000000L)));
.setTimestamp(1000000L)
.setCurrentLeaderEpoch(5)));
return ListOffsetRequest.Builder
.forConsumer(true, IsolationLevel.READ_UNCOMMITTED)
.setTargetTimes(Collections.singletonList(topic))
.build((short) version);
} else if (version >= 2 && version <= 5) {
ListOffsetPartition partition = new ListOffsetPartition()
.setPartitionIndex(0)
.setTimestamp(1000000L);
if (version >= 4) {
partition.setCurrentLeaderEpoch(5);
}
.setTimestamp(1000000L)
.setCurrentLeaderEpoch(5);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest to set current leader epoch for all the versions as we have made it ignorable. We set it all the time in the replica fetcher so the test would be aligned with what we do.


ListOffsetTopic topic = new ListOffsetTopic()
.setName("test")
.setPartitions(Arrays.asList(partition));
Expand Down
30 changes: 23 additions & 7 deletions core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,13 @@ class ListOffsetsRequestTest extends BaseRequestTest {
val partitionData = response.topics.asScala.find(_.name == topic).get
.partitions.asScala.find(_.partitionIndex == partition.partition).get

(partitionData.offset, partitionData.leaderEpoch)
if (version == 0) {
if (partitionData.oldStyleOffsets().isEmpty)
(-1, partitionData.leaderEpoch)
else
(partitionData.oldStyleOffsets().asScala.head, partitionData.leaderEpoch)
Comment on lines +147 to +150
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We could use headOption.getOrElse(-1) and avoid the extra if/else.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getOrElse returns Any type (supertype of Long). I kept the current code changes to avoid any conversions

} else
(partitionData.offset, partitionData.leaderEpoch)
}

@Test
Expand Down Expand Up @@ -174,17 +180,27 @@ class ListOffsetsRequestTest extends BaseRequestTest {
}

@Test
def testResponseDefaultOffsetAndLeaderEpochForLowerVersions(): Unit = {
def testResponseDefaultOffsetAndLeaderEpochForAllVersions(): Unit = {
val partitionToLeader = TestUtils.createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 3, servers)
val firstLeaderId = partitionToLeader(partition.partition)

TestUtils.generateAndProduceMessages(servers, topic, 10)

assertEquals((-1L, -1), fetchOffsetAndEpoch(firstLeaderId, 0L, 0))
assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, 0L, 1))
assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, 0L, 2))
assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, 0L, 3))
assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, 0L, 4))
for (version <- ApiKeys.LIST_OFFSETS.oldestVersion to ApiKeys.LIST_OFFSETS.latestVersion) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Now that we test all versions, could we rename to test to testResponseDefaultOffsetAndLeaderEpochForAllVersions in order to stay consistent?

if (version == 0) {
assertEquals((-1L, -1), fetchOffsetAndEpoch(firstLeaderId, 0L, version.toShort))
assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, ListOffsetRequest.EARLIEST_TIMESTAMP, version.toShort))
assertEquals((10L, -1), fetchOffsetAndEpoch(firstLeaderId, ListOffsetRequest.LATEST_TIMESTAMP, version.toShort))
} else if (version >= 1 && version <= 3) {
assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, 0L, version.toShort))
assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, ListOffsetRequest.EARLIEST_TIMESTAMP, version.toShort))
assertEquals((10L, -1), fetchOffsetAndEpoch(firstLeaderId, ListOffsetRequest.LATEST_TIMESTAMP, version.toShort))
} else if (version >= 4) {
assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, 0L, version.toShort))
assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetRequest.EARLIEST_TIMESTAMP, version.toShort))
assertEquals((10L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetRequest.LATEST_TIMESTAMP, version.toShort))
}
}
}

private def assertResponseError(error: Errors, brokerId: Int, request: ListOffsetRequest): Unit = {
Expand Down